From c7b77d698baff5ad9927b5c14ec0d21e36622c3f Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 25 Jul 2024 18:26:51 -0700 Subject: [PATCH] chore(outbound): Simplify router test utilities (#3099) In preparation for adding more tests, this commit simplifies the test setup utilities so that they can be shared and reused across test modules. --- .../app/outbound/src/http/logical/tests.rs | 431 +++++------------- .../outbound/src/http/logical/tests/basic.rs | 52 +++ .../src/http/logical/tests/failure_accrual.rs | 264 +++++++++++ .../src/http/logical/tests/retries.rs | 278 ++++------- .../src/http/logical/tests/timeouts.rs | 237 ++++------ 5 files changed, 616 insertions(+), 646 deletions(-) create mode 100644 linkerd/app/outbound/src/http/logical/tests/basic.rs create mode 100644 linkerd/app/outbound/src/http/logical/tests/failure_accrual.rs diff --git a/linkerd/app/outbound/src/http/logical/tests.rs b/linkerd/app/outbound/src/http/logical/tests.rs index 3214e0f515..6006b9c687 100644 --- a/linkerd/app/outbound/src/http/logical/tests.rs +++ b/linkerd/app/outbound/src/http/logical/tests.rs @@ -1,321 +1,25 @@ use super::{policy, Outbound, ParentRef, Routes}; use crate::test_util::*; use linkerd_app_core::{ - errors, - exp_backoff::ExponentialBackoff, proxy::http::{self, BoxBody, HttpBody, StatusCode}, svc::{self, NewService, ServiceExt}, - trace, transport::addrs::*, Error, NameAddr, Result, }; use linkerd_proxy_client_policy as client_policy; use parking_lot::Mutex; use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; -use tokio::{sync::watch, task, time}; -use tracing::{info, Instrument}; +use tokio::sync::watch; +use tracing::Instrument; +mod basic; +mod failure_accrual; mod retries; mod timeouts; -const AUTHORITY: &str = "logical.test.svc.cluster.local"; -const PORT: u16 = 666; - type Request = http::Request; type Response = http::Response; -#[tokio::test(flavor = "current_thread", start_paused = true)] -async fn routes() { - let _trace = trace::test::trace_init(); - - let addr = SocketAddr::new([192, 0, 2, 41].into(), PORT); - let dest: NameAddr = format!("{AUTHORITY}:{PORT}") - .parse::() - .expect("dest addr is valid"); - let (svc, mut handle) = tower_test::mock::pair(); - let connect = HttpConnect::default().service(addr, svc); - let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default()); - let (rt, _shutdown) = runtime(); - let stack = Outbound::new(default_config(), rt, &mut Default::default()) - .with_stack(svc::ArcNewService::new(connect)) - .push_http_cached(resolve) - .into_inner(); - - let backend = default_backend(&dest); - let (_route_tx, routes) = - watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { - addr: dest.into(), - meta: ParentRef(client_policy::Meta::new_default("parent")), - backends: Arc::new([backend.clone()]), - routes: Arc::new([default_route(backend)]), - failure_accrual: client_policy::FailureAccrual::None, - }))); - let target = Target { - num: 1, - version: http::Version::H2, - routes, - }; - let svc = stack.new_service(target); - - handle.allow(1); - let rsp = send_req(svc.clone(), http_get()); - serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await; - assert_eq!( - rsp.await.expect("request must succeed").status(), - http::StatusCode::OK - ); -} - -#[tokio::test(flavor = "current_thread", start_paused = true)] -async fn consecutive_failures_accrue() { - let _trace = trace::test::with_default_filter(format!("{},trace", trace::test::DEFAULT_LOG)); - - let addr = SocketAddr::new([192, 0, 2, 41].into(), PORT); - let dest: NameAddr = format!("{AUTHORITY}:{PORT}") - .parse::() - .expect("dest addr is valid"); - let (svc, mut handle) = tower_test::mock::pair(); - let connect = HttpConnect::default().service(addr, svc); - let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default()); - let (rt, _shutdown) = runtime(); - let cfg = default_config(); - let stack = Outbound::new(cfg.clone(), rt, &mut Default::default()) - .with_stack(svc::ArcNewService::new(connect)) - .push_http_cached(resolve) - .into_inner(); - - let backend = default_backend(&dest); - // Ensure that the probe delay is longer than the failfast timeout, so that - // the service is only probed after it has entered failfast when the gate - // shuts. - let min_backoff = cfg.http_request_queue.failfast_timeout + Duration::from_secs(1); - let backoff = ExponentialBackoff::try_new( - min_backoff, - min_backoff * 6, - // no jitter --- ensure the test is deterministic - 0.0, - ) - .unwrap(); - let mut backoffs = backoff.stream(); - let (_route_tx, routes) = - watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { - addr: dest.into(), - meta: ParentRef(client_policy::Meta::new_default("parent")), - backends: Arc::new([backend.clone()]), - routes: Arc::new([default_route(backend)]), - failure_accrual: client_policy::FailureAccrual::ConsecutiveFailures { - max_failures: 3, - backoff, - }, - }))); - let target = Target { - num: 1, - version: http::Version::H2, - routes, - }; - let svc = stack.new_service(target); - - info!("Sending good request"); - handle.allow(1); - let rsp = send_req(svc.clone(), http_get()); - serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await; - assert_rsp(rsp, StatusCode::OK, "good").await; - - // fail 3 requests so that we hit the consecutive failures accrual limit - for i in 1..=3 { - info!("Sending bad request {i}/3"); - handle.allow(1); - let rsp = send_req(svc.clone(), http_get()); - serve_req( - &mut handle, - mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "bad"), - ) - .await; - assert_rsp(rsp, StatusCode::INTERNAL_SERVER_ERROR, "bad").await; - } - - // Ensure that the error is because of the breaker, and not because the - // underlying service doesn't poll ready. - info!("Sending request while in failfast"); - handle.allow(1); - // We are now in failfast. - let error = send_req(svc.clone(), http_get()) - .await - .expect_err("service should be in failfast"); - assert!( - errors::is_caused_by::(error.as_ref()), - "service should be in failfast" - ); - - info!("Sending request while in loadshed"); - let error = send_req(svc.clone(), http_get()) - .await - .expect_err("service should be in failfast"); - assert!( - errors::is_caused_by::(error.as_ref()), - "service should be in failfast" - ); - - // After the probation period, a subsequent request should be failed by - // hitting the service. - info!("Waiting for probation"); - backoffs.next().await; - task::yield_now().await; - - info!("Sending a bad request while in probation"); - handle.allow(1); - let rsp = send_req(svc.clone(), http_get()); - info!("Serving response"); - tokio::time::timeout( - time::Duration::from_secs(10), - serve_req( - &mut handle, - mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "bad"), - ), - ) - .await - .expect("no timeouts"); - assert_rsp(rsp, StatusCode::INTERNAL_SERVER_ERROR, "bad").await; - - // We are now in failfast. - info!("Sending a failfast request while the circuit is broken"); - handle.allow(1); - let error = send_req(svc.clone(), http_get()) - .await - .expect_err("service should be in failfast"); - assert!( - errors::is_caused_by::(error.as_ref()), - "service should be in failfast" - ); - - // Wait out the probation period again - info!("Waiting for probation again"); - backoffs.next().await; - task::yield_now().await; - - // The probe request succeeds - info!("Sending a good request while in probation"); - handle.allow(1); - let rsp = send_req(svc.clone(), http_get()); - tokio::time::timeout( - time::Duration::from_secs(10), - serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")), - ) - .await - .expect("no timeouts"); - assert_rsp(rsp, StatusCode::OK, "good").await; - - // The gate is now open again - info!("Sending a final good request"); - handle.allow(1); - let rsp = send_req(svc.clone(), http_get()); - tokio::time::timeout( - time::Duration::from_secs(10), - serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")), - ) - .await - .expect("no timeouts"); - assert_rsp(rsp, StatusCode::OK, "good").await; -} - -#[tokio::test(flavor = "current_thread", start_paused = true)] -async fn balancer_doesnt_select_tripped_breakers() { - let _trace = trace::test::with_default_filter(format!( - "{},linkerd_app_outbound=trace,linkerd_stack=trace,linkerd2_proxy_http_balance=trace", - trace::test::DEFAULT_LOG - )); - - let addr1 = SocketAddr::new([192, 0, 2, 41].into(), PORT); - let addr2 = SocketAddr::new([192, 0, 2, 42].into(), PORT); - let dest: NameAddr = format!("{AUTHORITY}:{PORT}") - .parse::() - .expect("dest addr is valid"); - let (svc1, mut handle1) = tower_test::mock::pair(); - let (svc2, mut handle2) = tower_test::mock::pair(); - let connect = HttpConnect::default() - .service(addr1, svc1) - .service(addr2, svc2); - let resolve = support::resolver(); - let mut dest_tx = resolve.endpoint_tx(dest.clone()); - dest_tx - .add([(addr1, Default::default()), (addr2, Default::default())]) - .unwrap(); - let (rt, _shutdown) = runtime(); - let cfg = default_config(); - let stack = Outbound::new(cfg.clone(), rt, &mut Default::default()) - .with_stack(svc::ArcNewService::new(connect)) - .push_http_cached(resolve) - .into_inner(); - - let backend = default_backend(&dest); - // Ensure that the probe delay is longer than the failfast timeout, so that - // the service is only probed after it has entered failfast when the gate - // shuts. - let min_backoff = cfg.http_request_queue.failfast_timeout + Duration::from_secs(1); - let backoff = ExponentialBackoff::try_new( - min_backoff, - min_backoff * 6, - // no jitter --- ensure the test is deterministic - 0.0, - ) - .unwrap(); - let (_route_tx, routes) = - watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { - addr: dest.into(), - meta: ParentRef(client_policy::Meta::new_default("parent")), - backends: Arc::new([backend.clone()]), - routes: Arc::new([default_route(backend)]), - failure_accrual: client_policy::FailureAccrual::ConsecutiveFailures { - max_failures: 3, - backoff, - }, - }))); - let target = Target { - num: 1, - version: http::Version::H2, - routes, - }; - let svc = stack.new_service(target); - - // fail 3 requests so that we hit the consecutive failures accrual limit - let mut failed = 0; - while failed < 3 { - handle1.allow(1); - handle2.allow(1); - info!(failed); - let rsp = send_req(svc.clone(), http_get()); - let (expected_status, expected_body) = tokio::select! { - _ = serve_req(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")) => { - info!("Balancer selected good endpoint"); - (StatusCode::OK, "endpoint 1") - } - _ = serve_req(&mut handle2, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "endpoint 2")) => { - info!("Balancer selected bad endpoint"); - failed += 1; - (StatusCode::INTERNAL_SERVER_ERROR, "endpoint 2") - } - }; - assert_rsp(rsp, expected_status, expected_body).await; - task::yield_now().await; - } - - handle1.allow(1); - handle2.allow(1); - let rsp = send_req(svc.clone(), http_get()); - // The load balancer will select endpoint 1, because endpoint 2 isn't ready. - serve_req(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")).await; - assert_rsp(rsp, StatusCode::OK, "endpoint 1").await; - - // The load balancer should continue selecting the non-failing endpoint. - for _ in 0..8 { - handle1.allow(1); - handle2.allow(1); - let rsp = send_req(svc.clone(), http_get()); - serve_req(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")).await; - assert_rsp(rsp, StatusCode::OK, "endpoint 1").await; - } -} - // === Utils === #[derive(Clone, Debug)] @@ -383,7 +87,7 @@ impl>> svc::NewService for HttpConnect { } } -// === +// === Utils === #[track_caller] fn send_req( @@ -415,15 +119,15 @@ fn send_req( async move { rsp.await.expect("request task must not panic") } } -fn mk_rsp(status: StatusCode, body: impl ToString) -> Response { - http::Response::builder() +async fn mk_rsp(status: StatusCode, body: impl ToString) -> Result { + Ok(http::Response::builder() .status(status) .body(http::BoxBody::new(body.to_string())) - .unwrap() + .unwrap()) } -fn mk_grpc_rsp(code: tonic::Code) -> Response { - http::Response::builder() +async fn mk_grpc_rsp(code: tonic::Code) -> Result { + Ok(http::Response::builder() .version(::http::Version::HTTP_2) .header( "content-type", @@ -434,7 +138,7 @@ fn mk_grpc_rsp(code: tonic::Code) -> Response { trls.insert("grpc-status", (code as u8).to_string().parse().unwrap()); Ok(Some(trls)) }))) - .unwrap() + .unwrap()) } async fn assert_rsp( @@ -452,14 +156,9 @@ async fn assert_rsp( assert_eq!(body, expected_body, "expected body to be {expected_body:?}"); } -async fn serve_req(handle: &mut tower_test::mock::Handle, rsp: Response) { - serve_delayed(Duration::ZERO, handle, Ok(rsp)).await; -} - -async fn serve_delayed( - delay: Duration, +async fn serve( handle: &mut tower_test::mock::Handle, - rsp: Result, + call: impl Future> + Send + 'static, ) { let (mut req, tx) = handle .next_request() @@ -472,24 +171,20 @@ async fn serve_delayed( while let Some(res) = req.body_mut().data().await { res.expect("request body must not error"); } - } - if !req.body().is_end_stream() { - req.body_mut() - .trailers() - .await - .expect("request body must not error"); + if !req.body().is_end_stream() { + req.body_mut() + .trailers() + .await + .expect("request body must not error"); + } } drop(req); tokio::spawn( async move { - if delay > Duration::ZERO { - tracing::debug!(?delay, "Sleeping"); - tokio::time::sleep(delay).await; - } - - tracing::debug!(?rsp, "Sending response"); - match rsp { + let res = call.await; + tracing::debug!(?res, "Sending response"); + match res { Ok(rsp) => tx.send_response(rsp), Err(e) => tx.send_error(e), } @@ -547,3 +242,85 @@ fn default_route(backend: client_policy::Backend) -> client_policy::http::Route }], } } + +type Handle = tower_test::mock::Handle; + +fn mock_http(params: client_policy::http::RouteParams) -> (svc::BoxCloneHttp, Handle) { + let dest = "example.com:1234".parse::().unwrap(); + let backend = default_backend(&dest); + let route = mk_route(backend.clone(), params); + mock(policy::Params::Http(policy::HttpParams { + addr: dest.into(), + meta: ParentRef(client_policy::Meta::new_default("parent")), + backends: Arc::new([backend]), + routes: Arc::new([route]), + failure_accrual: client_policy::FailureAccrual::None, + })) +} + +fn mock_grpc(params: client_policy::grpc::RouteParams) -> (svc::BoxCloneHttp, Handle) { + let dest = "example.com:1234".parse::().unwrap(); + let backend = default_backend(&dest); + let route = mk_route(backend.clone(), params); + mock(policy::Params::Grpc(policy::GrpcParams { + addr: dest.into(), + meta: ParentRef(client_policy::Meta::new_default("parent")), + backends: Arc::new([backend]), + routes: Arc::new([route]), + failure_accrual: client_policy::FailureAccrual::None, + })) +} + +fn mock(params: policy::Params) -> (svc::BoxCloneHttp, Handle) { + let (inner, handle) = tower_test::mock::pair(); + + let addr = SocketAddr::new([192, 0, 2, 41].into(), 1234); + let connect = HttpConnect::default().service(addr, inner); + let resolve = support::resolver().endpoint_exists( + params.addr().name_addr().unwrap().clone(), + addr, + Default::default(), + ); + let (rt, shutdown) = runtime(); + let stack = Outbound::new(default_config(), rt, &mut Default::default()) + .with_stack(svc::ArcNewService::new(connect)) + .push_http_cached(resolve) + .into_inner(); + + let (tx, routes) = watch::channel(Routes::Policy(params)); + tokio::spawn(async move { + tx.closed().await; + drop(shutdown); + }); + + let svc = stack.new_service(Target { + num: 1, + version: http::Version::H2, + routes, + }); + + (svc, handle) +} + +fn mk_route( + backend: client_policy::Backend, + params: P, +) -> client_policy::route::Route> { + use client_policy::*; + + route::Route { + hosts: vec![], + rules: vec![route::Rule { + matches: vec![M::default()], + policy: RoutePolicy { + meta: Meta::new_default("route"), + filters: [].into(), + distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { + filters: [].into(), + backend, + }])), + params, + }, + }], + } +} diff --git a/linkerd/app/outbound/src/http/logical/tests/basic.rs b/linkerd/app/outbound/src/http/logical/tests/basic.rs new file mode 100644 index 0000000000..1184e42686 --- /dev/null +++ b/linkerd/app/outbound/src/http/logical/tests/basic.rs @@ -0,0 +1,52 @@ +use super::*; +use linkerd_app_core::{ + proxy::http::{self, StatusCode}, + svc, trace, NameAddr, +}; +use linkerd_proxy_client_policy as client_policy; +use std::{net::SocketAddr, sync::Arc}; +use tokio::sync::watch; + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn routes() { + let _trace = trace::test::trace_init(); + + const AUTHORITY: &str = "logical.test.svc.cluster.local"; + const PORT: u16 = 666; + let addr = SocketAddr::new([192, 0, 2, 41].into(), PORT); + let dest: NameAddr = format!("{AUTHORITY}:{PORT}") + .parse::() + .expect("dest addr is valid"); + let (svc, mut handle) = tower_test::mock::pair(); + let connect = HttpConnect::default().service(addr, svc); + let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default()); + let (rt, _shutdown) = runtime(); + let stack = Outbound::new(default_config(), rt, &mut Default::default()) + .with_stack(svc::ArcNewService::new(connect)) + .push_http_cached(resolve) + .into_inner(); + + let backend = default_backend(&dest); + let (_route_tx, routes) = + watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { + addr: dest.into(), + meta: ParentRef(client_policy::Meta::new_default("parent")), + backends: Arc::new([backend.clone()]), + routes: Arc::new([default_route(backend)]), + failure_accrual: client_policy::FailureAccrual::None, + }))); + let target = Target { + num: 1, + version: http::Version::H2, + routes, + }; + let svc = stack.new_service(target); + + handle.allow(1); + let rsp = send_req(svc.clone(), http_get()); + serve(&mut handle, mk_rsp(StatusCode::OK, "good")).await; + assert_eq!( + rsp.await.expect("request must succeed").status(), + http::StatusCode::OK + ); +} diff --git a/linkerd/app/outbound/src/http/logical/tests/failure_accrual.rs b/linkerd/app/outbound/src/http/logical/tests/failure_accrual.rs new file mode 100644 index 0000000000..bd184368e6 --- /dev/null +++ b/linkerd/app/outbound/src/http/logical/tests/failure_accrual.rs @@ -0,0 +1,264 @@ +use super::*; +use linkerd_app_core::{ + errors, + exp_backoff::ExponentialBackoff, + proxy::http::{self, StatusCode}, + svc, trace, NameAddr, +}; +use linkerd_proxy_client_policy as client_policy; +use std::{net::SocketAddr, sync::Arc, time::Duration}; +use tokio::{sync::watch, task, time}; +use tracing::info; + +const AUTHORITY: &str = "logical.test.svc.cluster.local"; +const PORT: u16 = 666; + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn consecutive_failures_accrue() { + let _trace = trace::test::with_default_filter(format!("{},trace", trace::test::DEFAULT_LOG)); + + let addr = SocketAddr::new([192, 0, 2, 41].into(), PORT); + let dest: NameAddr = format!("{AUTHORITY}:{PORT}") + .parse::() + .expect("dest addr is valid"); + let (svc, mut handle) = tower_test::mock::pair(); + let connect = HttpConnect::default().service(addr, svc); + let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default()); + let (rt, _shutdown) = runtime(); + let cfg = default_config(); + let stack = Outbound::new(cfg.clone(), rt, &mut Default::default()) + .with_stack(svc::ArcNewService::new(connect)) + .push_http_cached(resolve) + .into_inner(); + + let backend = default_backend(&dest); + // Ensure that the probe delay is longer than the failfast timeout, so that + // the service is only probed after it has entered failfast when the gate + // shuts. + let min_backoff = cfg.http_request_queue.failfast_timeout + Duration::from_secs(1); + let backoff = ExponentialBackoff::try_new( + min_backoff, + min_backoff * 6, + // no jitter --- ensure the test is deterministic + 0.0, + ) + .unwrap(); + let mut backoffs = backoff.stream(); + let (_route_tx, routes) = + watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { + addr: dest.into(), + meta: ParentRef(client_policy::Meta::new_default("parent")), + backends: Arc::new([backend.clone()]), + routes: Arc::new([default_route(backend)]), + failure_accrual: client_policy::FailureAccrual::ConsecutiveFailures { + max_failures: 3, + backoff, + }, + }))); + let target = Target { + num: 1, + version: http::Version::H2, + routes, + }; + let svc = stack.new_service(target); + + info!("Sending good request"); + handle.allow(1); + let rsp = send_req(svc.clone(), http_get()); + serve(&mut handle, mk_rsp(StatusCode::OK, "good")).await; + assert_rsp(rsp, StatusCode::OK, "good").await; + + // fail 3 requests so that we hit the consecutive failures accrual limit + for i in 1..=3 { + info!("Sending bad request {i}/3"); + handle.allow(1); + let rsp = send_req(svc.clone(), http_get()); + serve( + &mut handle, + mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "bad"), + ) + .await; + assert_rsp(rsp, StatusCode::INTERNAL_SERVER_ERROR, "bad").await; + } + + // Ensure that the error is because of the breaker, and not because the + // underlying service doesn't poll ready. + info!("Sending request while in failfast"); + handle.allow(1); + // We are now in failfast. + let error = send_req(svc.clone(), http_get()) + .await + .expect_err("service should be in failfast"); + assert!( + errors::is_caused_by::(error.as_ref()), + "service should be in failfast" + ); + + info!("Sending request while in loadshed"); + let error = send_req(svc.clone(), http_get()) + .await + .expect_err("service should be in failfast"); + assert!( + errors::is_caused_by::(error.as_ref()), + "service should be in failfast" + ); + + // After the probation period, a subsequent request should be failed by + // hitting the service. + info!("Waiting for probation"); + backoffs.next().await; + task::yield_now().await; + + info!("Sending a bad request while in probation"); + handle.allow(1); + let rsp = send_req(svc.clone(), http_get()); + info!("Serving response"); + tokio::time::timeout( + time::Duration::from_secs(10), + serve( + &mut handle, + mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "bad"), + ), + ) + .await + .expect("no timeouts"); + assert_rsp(rsp, StatusCode::INTERNAL_SERVER_ERROR, "bad").await; + + // We are now in failfast. + info!("Sending a failfast request while the circuit is broken"); + handle.allow(1); + let error = send_req(svc.clone(), http_get()) + .await + .expect_err("service should be in failfast"); + assert!( + errors::is_caused_by::(error.as_ref()), + "service should be in failfast" + ); + + // Wait out the probation period again + info!("Waiting for probation again"); + backoffs.next().await; + task::yield_now().await; + + // The probe request succeeds + info!("Sending a good request while in probation"); + handle.allow(1); + let rsp = send_req(svc.clone(), http_get()); + tokio::time::timeout( + time::Duration::from_secs(10), + serve(&mut handle, mk_rsp(StatusCode::OK, "good")), + ) + .await + .expect("no timeouts"); + assert_rsp(rsp, StatusCode::OK, "good").await; + + // The gate is now open again + info!("Sending a final good request"); + handle.allow(1); + let rsp = send_req(svc.clone(), http_get()); + tokio::time::timeout( + time::Duration::from_secs(10), + serve(&mut handle, mk_rsp(StatusCode::OK, "good")), + ) + .await + .expect("no timeouts"); + assert_rsp(rsp, StatusCode::OK, "good").await; +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn balancer_doesnt_select_tripped_breakers() { + let _trace = trace::test::with_default_filter(format!( + "{},linkerd_app_outbound=trace,linkerd_stack=trace,linkerd2_proxy_http_balance=trace", + trace::test::DEFAULT_LOG + )); + + let addr1 = SocketAddr::new([192, 0, 2, 41].into(), PORT); + let addr2 = SocketAddr::new([192, 0, 2, 42].into(), PORT); + let dest: NameAddr = format!("{AUTHORITY}:{PORT}") + .parse::() + .expect("dest addr is valid"); + let (svc1, mut handle1) = tower_test::mock::pair(); + let (svc2, mut handle2) = tower_test::mock::pair(); + let connect = HttpConnect::default() + .service(addr1, svc1) + .service(addr2, svc2); + let resolve = support::resolver(); + let mut dest_tx = resolve.endpoint_tx(dest.clone()); + dest_tx + .add([(addr1, Default::default()), (addr2, Default::default())]) + .unwrap(); + let (rt, _shutdown) = runtime(); + let cfg = default_config(); + let stack = Outbound::new(cfg.clone(), rt, &mut Default::default()) + .with_stack(svc::ArcNewService::new(connect)) + .push_http_cached(resolve) + .into_inner(); + + let backend = default_backend(&dest); + // Ensure that the probe delay is longer than the failfast timeout, so that + // the service is only probed after it has entered failfast when the gate + // shuts. + let min_backoff = cfg.http_request_queue.failfast_timeout + Duration::from_secs(1); + let backoff = ExponentialBackoff::try_new( + min_backoff, + min_backoff * 6, + // no jitter --- ensure the test is deterministic + 0.0, + ) + .unwrap(); + let (_route_tx, routes) = + watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { + addr: dest.into(), + meta: ParentRef(client_policy::Meta::new_default("parent")), + backends: Arc::new([backend.clone()]), + routes: Arc::new([default_route(backend)]), + failure_accrual: client_policy::FailureAccrual::ConsecutiveFailures { + max_failures: 3, + backoff, + }, + }))); + let target = Target { + num: 1, + version: http::Version::H2, + routes, + }; + let svc = stack.new_service(target); + + // fail 3 requests so that we hit the consecutive failures accrual limit + let mut failed = 0; + while failed < 3 { + handle1.allow(1); + handle2.allow(1); + info!(failed); + let rsp = send_req(svc.clone(), http_get()); + let (expected_status, expected_body) = tokio::select! { + _ = serve(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")) => { + info!("Balancer selected good endpoint"); + (StatusCode::OK, "endpoint 1") + } + _ = serve(&mut handle2, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "endpoint 2")) => { + info!("Balancer selected bad endpoint"); + failed += 1; + (StatusCode::INTERNAL_SERVER_ERROR, "endpoint 2") + } + }; + assert_rsp(rsp, expected_status, expected_body).await; + task::yield_now().await; + } + + handle1.allow(1); + handle2.allow(1); + let rsp = send_req(svc.clone(), http_get()); + // The load balancer will select endpoint 1, because endpoint 2 isn't ready. + serve(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")).await; + assert_rsp(rsp, StatusCode::OK, "endpoint 1").await; + + // The load balancer should continue selecting the non-failing endpoint. + for _ in 0..8 { + handle1.allow(1); + handle2.allow(1); + let rsp = send_req(svc.clone(), http_get()); + serve(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")).await; + assert_rsp(rsp, StatusCode::OK, "endpoint 1").await; + } +} diff --git a/linkerd/app/outbound/src/http/logical/tests/retries.rs b/linkerd/app/outbound/src/http/logical/tests/retries.rs index 82c45200cd..42c284a592 100644 --- a/linkerd/app/outbound/src/http/logical/tests/retries.rs +++ b/linkerd/app/outbound/src/http/logical/tests/retries.rs @@ -1,23 +1,18 @@ -use super::{ - super::{policy, Outbound, ParentRef, Routes}, - default_backend, http_get, mk_grpc_rsp, mk_rsp, send_req, serve_delayed, serve_req, - HttpConnect, Request, Response, Target, -}; -use crate::test_util::*; +use super::*; use hyper::body::HttpBody; use linkerd_app_core::{ errors, proxy::http::{self, StatusCode}, - svc::{self, http::stream_timeouts::StreamDeadlineError, NewService}, - trace, NameAddr, + svc::http::stream_timeouts::StreamDeadlineError, + trace, }; use linkerd_proxy_client_policy::{ self as client_policy, grpc::{Codes, RouteParams as GrpcParams}, http::{RouteParams as HttpParams, Timeouts}, }; -use std::{collections::BTreeSet, net::SocketAddr, sync::Arc, time::Duration}; -use tokio::{sync::watch, time}; +use std::collections::BTreeSet; +use tokio::time; use tonic::Code; use tracing::{info, Instrument}; @@ -25,7 +20,7 @@ use tracing::{info, Instrument}; async fn http_5xx() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); + const TIMEOUT: time::Duration = time::Duration::from_secs(2); let (svc, mut handle) = mock_http(HttpParams { timeouts: Timeouts { request: Some(TIMEOUT), @@ -40,31 +35,29 @@ async fn http_5xx() { }), }); - info!("Sending a request that will initially fail and then succeed"); tokio::spawn( async move { handle.allow(2); - info!("Failing the first request"); - serve_req(&mut handle, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "")).await; - info!("Serving the second request"); - serve_req(&mut handle, mk_rsp(StatusCode::NO_CONTENT, "")).await; + serve(&mut handle, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "")).await; + serve(&mut handle, mk_rsp(StatusCode::NO_CONTENT, "")).await; handle } .in_current_span(), ); - info!("Verifying that we see the successful response"); + info!("Sending a request that will initially fail and then succeed"); let rsp = time::timeout(TIMEOUT, send_req(svc.clone(), http_get())) .await .expect("response"); + info!("Verifying that we see the successful response"); assert_eq!(rsp.expect("response").status(), StatusCode::NO_CONTENT); } #[tokio::test(flavor = "current_thread", start_paused = true)] -async fn http_5xx_limits() { +async fn http_5xx_limited() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); + const TIMEOUT: time::Duration = time::Duration::from_secs(2); let (svc, mut handle) = mock_http(HttpParams { timeouts: Timeouts { request: Some(TIMEOUT), @@ -83,14 +76,26 @@ async fn http_5xx_limits() { tokio::spawn( async move { handle.allow(3); - info!("Failing the first request"); - serve_req(&mut handle, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "")).await; - info!("Failing the second request"); - serve_req(&mut handle, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "")).await; - info!("Failing the third request"); - serve_req(&mut handle, mk_rsp(StatusCode::GATEWAY_TIMEOUT, "")).await; + serve(&mut handle, async move { + info!("Failing the first request"); + mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "").await + }) + .await; + serve(&mut handle, async move { + info!("Failing the second request"); + mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "").await + }) + .await; + serve(&mut handle, async move { + info!("Failing the third request"); + mk_rsp(StatusCode::GATEWAY_TIMEOUT, "").await + }) + .await; info!("Prepping the fourth request (shouldn't be served)"); - serve_req(&mut handle, mk_rsp(StatusCode::NO_CONTENT, "")).await; + serve(&mut handle, async move { + mk_rsp(StatusCode::NO_CONTENT, "").await + }) + .await; handle } .in_current_span(), @@ -107,7 +112,7 @@ async fn http_5xx_limits() { async fn http_timeout() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); + const TIMEOUT: time::Duration = time::Duration::from_secs(2); let (svc, mut handle) = mock_http(HttpParams { timeouts: Timeouts { request: Some(TIMEOUT), @@ -127,16 +132,18 @@ async fn http_timeout() { async move { handle.allow(2); - info!("Delaying the first request"); - serve_delayed( - TIMEOUT / 2, - &mut handle, - Ok(mk_rsp(StatusCode::NOT_FOUND, "")), - ) + serve(&mut handle, async move { + info!("Delaying the first request"); + time::sleep(TIMEOUT / 2).await; + mk_rsp(StatusCode::NOT_FOUND, "").await + }) .await; - info!("Serving the second request"); - serve_req(&mut handle, mk_rsp(StatusCode::NO_CONTENT, "")).await; + serve(&mut handle, async move { + info!("Serving the second request"); + mk_rsp(StatusCode::NO_CONTENT, "").await + }) + .await; handle } @@ -154,7 +161,7 @@ async fn http_timeout() { async fn http_timeout_on_limit() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); + const TIMEOUT: time::Duration = time::Duration::from_secs(2); let (svc, mut handle) = mock_http(HttpParams { timeouts: Timeouts { request: Some(TIMEOUT), @@ -169,25 +176,22 @@ async fn http_timeout_on_limit() { }), }); - info!("Testing that a retry timeout does not apply when max retries is reached"); tokio::spawn( async move { handle.allow(2); - info!("Delaying the first request"); - serve_delayed( - TIMEOUT / 3, - &mut handle, - Ok(mk_rsp(StatusCode::NOT_FOUND, "")), - ) + serve(&mut handle, async move { + info!("Delaying the first request"); + time::sleep(TIMEOUT / 3).await; + mk_rsp(StatusCode::NOT_FOUND, "").await + }) .await; - info!("Delaying the second request"); - serve_delayed( - TIMEOUT / 3, - &mut handle, - Ok(mk_rsp(StatusCode::NO_CONTENT, "")), - ) + serve(&mut handle, async move { + info!("Delaying the second request"); + time::sleep(TIMEOUT / 3).await; + mk_rsp(StatusCode::NO_CONTENT, "").await + }) .await; handle @@ -195,10 +199,12 @@ async fn http_timeout_on_limit() { .in_current_span(), ); - info!("Verifying that the initial request was retried"); + info!("Testing that a retry timeout does not apply when max retries is reached"); let rsp = time::timeout(TIMEOUT, send_req(svc.clone(), http_get())) .await .expect("response"); + + info!("Verifying that the initial request was retried"); assert_eq!(rsp.expect("response").status(), StatusCode::NO_CONTENT); } @@ -206,7 +212,7 @@ async fn http_timeout_on_limit() { async fn http_timeout_with_request_timeout() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_millis(100); + const TIMEOUT: time::Duration = time::Duration::from_millis(100); let (svc, mut handle) = mock_http(HttpParams { timeouts: Timeouts { request: Some(TIMEOUT * 5), @@ -226,47 +232,49 @@ async fn http_timeout_with_request_timeout() { async move { handle.allow(6); - info!("Delaying the first request"); - serve_delayed( - TIMEOUT * 2, - &mut handle, - Ok(mk_rsp(StatusCode::IM_A_TEAPOT, "")), - ) + // First request. + + serve(&mut handle, async move { + info!("Delaying the first request"); + time::sleep(TIMEOUT * 2).await; + mk_rsp(StatusCode::IM_A_TEAPOT, "").await + }) .await; - info!("Delaying the second request"); - serve_delayed( - TIMEOUT * 2, - &mut handle, - Ok(mk_rsp(StatusCode::IM_A_TEAPOT, "")), - ) + serve(&mut handle, async move { + info!("Delaying the second request"); + time::sleep(TIMEOUT * 2).await; + mk_rsp(StatusCode::IM_A_TEAPOT, "").await + }) .await; - info!("Delaying the third request"); - serve_req(&mut handle, mk_rsp(StatusCode::NO_CONTENT, "")).await; + serve(&mut handle, async move { + info!("Delaying the third request"); + mk_rsp(StatusCode::NO_CONTENT, "").await + }) + .await; - info!("Delaying the fourth request"); - serve_delayed( - TIMEOUT * 2, - &mut handle, - Ok(mk_rsp(StatusCode::IM_A_TEAPOT, "")), - ) + // Second request + + serve(&mut handle, async move { + info!("Delaying the fourth request"); + time::sleep(TIMEOUT * 2).await; + mk_rsp(StatusCode::IM_A_TEAPOT, "").await + }) .await; - info!("Delaying the fifth request"); - serve_delayed( - TIMEOUT * 2, - &mut handle, - Ok(mk_rsp(StatusCode::IM_A_TEAPOT, "")), - ) + serve(&mut handle, async move { + info!("Delaying the fifth request"); + time::sleep(TIMEOUT * 2).await; + mk_rsp(StatusCode::IM_A_TEAPOT, "").await + }) .await; - info!("Delaying the sixth request"); - serve_delayed( - TIMEOUT * 5, - &mut handle, - Ok(mk_rsp(StatusCode::NO_CONTENT, "")), - ) + serve(&mut handle, async move { + info!("Delaying the sixth request"); + time::sleep(TIMEOUT * 5).await; + mk_rsp(StatusCode::NO_CONTENT, "").await + }) .await; handle @@ -293,7 +301,7 @@ async fn http_timeout_with_request_timeout() { async fn grpc_internal() { let _trace = trace::test::with_default_filter("linkerd=debug"); - const TIMEOUT: Duration = Duration::from_millis(100); + const TIMEOUT: time::Duration = time::Duration::from_millis(100); let (svc, mut handle) = mock_grpc(GrpcParams { timeouts: Timeouts { request: Some(TIMEOUT), @@ -318,9 +326,9 @@ async fn grpc_internal() { async move { handle.allow(2); info!("Failing the first request"); - serve_req(&mut handle, mk_grpc_rsp(tonic::Code::Internal)).await; + serve(&mut handle, mk_grpc_rsp(tonic::Code::Internal)).await; info!("Serving the second request"); - serve_req(&mut handle, mk_grpc_rsp(tonic::Code::Ok)).await; + serve(&mut handle, mk_grpc_rsp(tonic::Code::Ok)).await; handle } .in_current_span(), @@ -358,7 +366,7 @@ async fn grpc_internal() { async fn grpc_timeout() { let _trace = trace::test::with_default_filter("linkerd=debug"); - const TIMEOUT: Duration = Duration::from_millis(100); + const TIMEOUT: time::Duration = time::Duration::from_millis(100); let (svc, mut handle) = mock_grpc(GrpcParams { timeouts: Timeouts { request: Some(TIMEOUT * 5), @@ -378,14 +386,13 @@ async fn grpc_timeout() { async move { handle.allow(2); info!("Delaying the first request"); - serve_delayed( - TIMEOUT * 2, - &mut handle, - Ok(mk_grpc_rsp(tonic::Code::NotFound)), - ) + serve(&mut handle, async move { + time::sleep(TIMEOUT * 2).await; + mk_grpc_rsp(tonic::Code::NotFound).await + }) .await; info!("Serving the second request"); - serve_req(&mut handle, mk_grpc_rsp(tonic::Code::Ok)).await; + serve(&mut handle, mk_grpc_rsp(tonic::Code::Ok)).await; handle } .in_current_span(), @@ -418,88 +425,3 @@ async fn grpc_timeout() { "0" ); } - -// === Utils === - -type Handle = tower_test::mock::Handle; - -fn mock_http(params: HttpParams) -> (svc::BoxCloneHttp, Handle) { - let dest: NameAddr = "example.com:1234".parse::().unwrap(); - let backend = default_backend(&dest); - let route = mk_route(backend.clone(), params); - mock( - dest.clone(), - policy::Params::Http(policy::HttpParams { - addr: dest.into(), - meta: ParentRef(client_policy::Meta::new_default("parent")), - backends: Arc::new([backend]), - routes: Arc::new([route]), - failure_accrual: client_policy::FailureAccrual::None, - }), - ) -} - -fn mock_grpc(params: GrpcParams) -> (svc::BoxCloneHttp, Handle) { - let dest: NameAddr = "example.com:1234".parse::().unwrap(); - let backend = default_backend(&dest); - let route = mk_route(backend.clone(), params); - mock( - dest.clone(), - policy::Params::Grpc(policy::GrpcParams { - addr: dest.into(), - meta: ParentRef(client_policy::Meta::new_default("parent")), - backends: Arc::new([backend]), - routes: Arc::new([route]), - failure_accrual: client_policy::FailureAccrual::None, - }), - ) -} - -fn mock(dest: NameAddr, params: policy::Params) -> (svc::BoxCloneHttp, Handle) { - let addr = SocketAddr::new([192, 0, 2, 41].into(), 1234); - - let (inner, handle) = tower_test::mock::pair(); - let connect = HttpConnect::default().service(addr, inner); - let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default()); - let (rt, _shutdown) = runtime(); - let stack = Outbound::new(default_config(), rt, &mut Default::default()) - .with_stack(svc::ArcNewService::new(connect)) - .push_http_cached(resolve) - .into_inner(); - - let (tx, routes) = watch::channel(Routes::Policy(params)); - tokio::spawn(async move { - tx.closed().await; - }); - - let svc = stack.new_service(Target { - num: 1, - version: http::Version::H2, - routes, - }); - - (svc, handle) -} - -fn mk_route( - backend: client_policy::Backend, - params: P, -) -> client_policy::route::Route> { - use client_policy::*; - - route::Route { - hosts: vec![], - rules: vec![route::Rule { - matches: vec![M::default()], - policy: RoutePolicy { - meta: Meta::new_default("route"), - filters: [].into(), - distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { - filters: [].into(), - backend, - }])), - params, - }, - }], - } -} diff --git a/linkerd/app/outbound/src/http/logical/tests/timeouts.rs b/linkerd/app/outbound/src/http/logical/tests/timeouts.rs index 40222ca596..b0489e32e2 100644 --- a/linkerd/app/outbound/src/http/logical/tests/timeouts.rs +++ b/linkerd/app/outbound/src/http/logical/tests/timeouts.rs @@ -1,42 +1,40 @@ -use super::{ - super::{policy, LogicalError, Outbound, ParentRef, Routes}, - default_backend, http_get, mk_rsp, send_req, serve_delayed, serve_req, HttpConnect, Request, - Response, Target, -}; -use crate::test_util::*; +use super::{super::LogicalError, *}; use linkerd_app_core::{ errors, proxy::http::{ self, stream_timeouts::{BodyTimeoutError, ResponseTimeoutError}, - BoxBody, HttpBody, StatusCode, + BoxBody, HttpBody, }, - svc::{self, NewService}, - trace, NameAddr, + trace, }; use linkerd_proxy_client_policy::{self as client_policy, http::Timeouts}; -use std::{net::SocketAddr, sync::Arc, time::Duration}; -use tokio::{sync::watch, time}; -use tracing::info; +use tokio::time; +use tracing::{info, Instrument}; #[tokio::test(flavor = "current_thread", start_paused = true)] async fn request_timeout_response_headers() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - request: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + request: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); info!("Sending a request that does not respond within the timeout"); handle.allow(1); let call = send_req(svc.clone(), http_get()); - serve_delayed( - TIMEOUT * 2, - &mut handle, - Ok(mk_rsp(StatusCode::NO_CONTENT, "")), - ) + serve(&mut handle, async move { + time::sleep(TIMEOUT * 2).await; + Ok(http::Response::builder() + .status(204) + .body(http::BoxBody::default()) + .unwrap()) + }) .await; info!("Verifying that the response fails with the expected error"); @@ -61,9 +59,12 @@ async fn request_timeout_response_headers() { async fn request_timeout_request_body() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - request: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + request: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); @@ -101,23 +102,28 @@ async fn request_timeout_request_body() { async fn request_timeout_response_body() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - request: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + request: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); info!("Sending a request that responds immediately but does not complete"); handle.allow(1); let call = send_req(svc.clone(), http_get()); - serve_req( + serve( &mut handle, - http::Response::builder() - .status(200) - .body(http::BoxBody::new(MockBody::new(async move { - futures::future::pending().await - }))) - .unwrap(), + future::ok( + http::Response::builder() + .status(200) + .body(http::BoxBody::new(MockBody::new(async move { + futures::future::pending().await + }))) + .unwrap(), + ), ) .await; @@ -142,23 +148,25 @@ async fn request_timeout_response_body() { async fn response_timeout_response_headers() { let _trace = trace::test::with_default_filter("linkerd=trace"); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - response: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + response: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); info!("Sending a request that does not respond within the timeout"); handle.allow(1); let call = send_req(svc.clone(), http_get()); - serve_delayed( - TIMEOUT * 2, - &mut handle, + serve(&mut handle, async move { + time::sleep(TIMEOUT * 2).await; Ok(http::Response::builder() .status(204) .body(http::BoxBody::default()) - .unwrap()), - ) + .unwrap()) + }) .await; info!("Verifying that the response fails with the expected error"); @@ -179,28 +187,36 @@ async fn response_timeout_response_headers() { async fn response_timeout_response_body() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - response: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + response: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); + tokio::spawn( + async move { + handle.allow(1); + serve(&mut handle, async move { + info!("Serving a response that never completes"); + Ok(http::Response::builder() + .status(200) + .body(http::BoxBody::new(MockBody::new(async move { + futures::future::pending().await + }))) + .unwrap()) + }) + .await; + } + .in_current_span(), + ); + info!("Sending a request that responds immediately but does not complete"); - handle.allow(1); - let call = send_req(svc.clone(), http_get()); - serve_req( - &mut handle, - http::Response::builder() - .status(200) - .body(http::BoxBody::new(MockBody::new(async move { - futures::future::pending().await - }))) - .unwrap(), - ) - .await; + let mut rsp = send_req(svc.clone(), http_get()).await.unwrap().into_body(); info!("Verifying that the request body times out with the expected stream error"); - let mut rsp = call.await.unwrap().into_body(); let error = time::timeout(TIMEOUT * 2, rsp.data()) .await .expect("should timeout internally") @@ -220,25 +236,27 @@ async fn response_timeout_response_body() { async fn response_timeout_ignores_request_body() { let _trace = trace::test::with_default_filter("linkerd=trace"); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - response: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + response: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); info!("Sending a request that exceeds the response timeout"); handle.allow(1); let call = send_req(svc.clone(), http_get()); - serve_req( - &mut handle, - http::Response::builder() + serve(&mut handle, async move { + info!("Serving a response that never completes"); + Ok(http::Response::builder() .status(200) .body(http::BoxBody::new(MockBody::new(async move { - time::sleep(TIMEOUT * 2).await; - Ok(()) + futures::future::pending().await }))) - .unwrap(), - ) + .unwrap()) + }) .await; info!("Verifying that the response succeeds despite slow request time"); @@ -252,24 +270,27 @@ async fn response_timeout_ignores_request_body() { async fn idle_timeout_response_body() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - idle: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + idle: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); info!("Sending a request that is served immediately with a body that does not update"); handle.allow(1); let call = send_req(svc.clone(), http_get()); - serve_req( - &mut handle, - http::Response::builder() + serve(&mut handle, async move { + info!("Serving a response that never completes"); + Ok(http::Response::builder() .status(200) .body(http::BoxBody::new(MockBody::new(async move { futures::future::pending().await }))) - .unwrap(), - ) + .unwrap()) + }) .await; info!("Verifying that the request body times out with the expected stream error"); @@ -288,69 +309,3 @@ async fn idle_timeout_response_body() { "expected idle timeout, got {error:?}" ); } - -// === Utils === - -type Handle = tower_test::mock::Handle; - -fn mock(timeouts: Timeouts) -> (svc::BoxCloneHttp, Handle) { - let addr = SocketAddr::new([192, 0, 2, 41].into(), 1234); - let dest: NameAddr = "example.com:1234".parse::().unwrap(); - - let (inner, handle) = tower_test::mock::pair(); - let connect = HttpConnect::default().service(addr, inner); - let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default()); - let (rt, _shutdown) = runtime(); - let stack = Outbound::new(default_config(), rt, &mut Default::default()) - .with_stack(svc::ArcNewService::new(connect)) - .push_http_cached(resolve) - .into_inner(); - - let (tx, routes) = { - let backend = default_backend(&dest); - let route = mk_route(backend.clone(), timeouts); - watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { - addr: dest.into(), - meta: ParentRef(client_policy::Meta::new_default("parent")), - backends: Arc::new([backend]), - routes: Arc::new([route]), - failure_accrual: client_policy::FailureAccrual::None, - }))) - }; - tokio::spawn(async move { - tx.closed().await; - }); - - let svc = stack.new_service(Target { - num: 1, - version: http::Version::H2, - routes, - }); - - (svc, handle) -} - -fn mk_route(backend: client_policy::Backend, timeouts: Timeouts) -> client_policy::http::Route { - use client_policy::{ - http::{self, Policy, Route, Rule}, - Meta, RouteBackend, RouteDistribution, - }; - Route { - hosts: vec![], - rules: vec![Rule { - matches: vec![http::r#match::MatchRequest::default()], - policy: Policy { - meta: Meta::new_default("timeout-route"), - filters: [].into(), - params: http::RouteParams { - timeouts, - ..Default::default() - }, - distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { - filters: [].into(), - backend, - }])), - }, - }], - } -}