diff --git a/linkerd/app/outbound/src/http/logical/tests.rs b/linkerd/app/outbound/src/http/logical/tests.rs index 3214e0f515..5fbb0ce0bd 100644 --- a/linkerd/app/outbound/src/http/logical/tests.rs +++ b/linkerd/app/outbound/src/http/logical/tests.rs @@ -59,7 +59,7 @@ async fn routes() { handle.allow(1); let rsp = send_req(svc.clone(), http_get()); - serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await; + serve(&mut handle, mk_rsp(StatusCode::OK, "good")).await; assert_eq!( rsp.await.expect("request must succeed").status(), http::StatusCode::OK @@ -118,7 +118,7 @@ async fn consecutive_failures_accrue() { info!("Sending good request"); handle.allow(1); let rsp = send_req(svc.clone(), http_get()); - serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await; + serve(&mut handle, mk_rsp(StatusCode::OK, "good")).await; assert_rsp(rsp, StatusCode::OK, "good").await; // fail 3 requests so that we hit the consecutive failures accrual limit @@ -126,7 +126,7 @@ async fn consecutive_failures_accrue() { info!("Sending bad request {i}/3"); handle.allow(1); let rsp = send_req(svc.clone(), http_get()); - serve_req( + serve( &mut handle, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "bad"), ) @@ -168,7 +168,7 @@ async fn consecutive_failures_accrue() { info!("Serving response"); tokio::time::timeout( time::Duration::from_secs(10), - serve_req( + serve( &mut handle, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "bad"), ), @@ -199,7 +199,7 @@ async fn consecutive_failures_accrue() { let rsp = send_req(svc.clone(), http_get()); tokio::time::timeout( time::Duration::from_secs(10), - serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")), + serve(&mut handle, mk_rsp(StatusCode::OK, "good")), ) .await .expect("no timeouts"); @@ -211,7 +211,7 @@ async fn consecutive_failures_accrue() { let rsp = send_req(svc.clone(), http_get()); tokio::time::timeout( time::Duration::from_secs(10), - serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")), + serve(&mut handle, mk_rsp(StatusCode::OK, "good")), ) .await .expect("no timeouts"); @@ -285,11 +285,11 @@ async fn balancer_doesnt_select_tripped_breakers() { info!(failed); let rsp = send_req(svc.clone(), http_get()); let (expected_status, expected_body) = tokio::select! { - _ = serve_req(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")) => { + _ = serve(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")) => { info!("Balancer selected good endpoint"); (StatusCode::OK, "endpoint 1") } - _ = serve_req(&mut handle2, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "endpoint 2")) => { + _ = serve(&mut handle2, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "endpoint 2")) => { info!("Balancer selected bad endpoint"); failed += 1; (StatusCode::INTERNAL_SERVER_ERROR, "endpoint 2") @@ -303,7 +303,7 @@ async fn balancer_doesnt_select_tripped_breakers() { handle2.allow(1); let rsp = send_req(svc.clone(), http_get()); // The load balancer will select endpoint 1, because endpoint 2 isn't ready. - serve_req(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")).await; + serve(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")).await; assert_rsp(rsp, StatusCode::OK, "endpoint 1").await; // The load balancer should continue selecting the non-failing endpoint. @@ -311,7 +311,7 @@ async fn balancer_doesnt_select_tripped_breakers() { handle1.allow(1); handle2.allow(1); let rsp = send_req(svc.clone(), http_get()); - serve_req(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")).await; + serve(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")).await; assert_rsp(rsp, StatusCode::OK, "endpoint 1").await; } } @@ -383,7 +383,7 @@ impl>> svc::NewService for HttpConnect { } } -// === +// === Utils === #[track_caller] fn send_req( @@ -415,15 +415,15 @@ fn send_req( async move { rsp.await.expect("request task must not panic") } } -fn mk_rsp(status: StatusCode, body: impl ToString) -> Response { - http::Response::builder() +async fn mk_rsp(status: StatusCode, body: impl ToString) -> Result { + Ok(http::Response::builder() .status(status) .body(http::BoxBody::new(body.to_string())) - .unwrap() + .unwrap()) } -fn mk_grpc_rsp(code: tonic::Code) -> Response { - http::Response::builder() +async fn mk_grpc_rsp(code: tonic::Code) -> Result { + Ok(http::Response::builder() .version(::http::Version::HTTP_2) .header( "content-type", @@ -434,7 +434,7 @@ fn mk_grpc_rsp(code: tonic::Code) -> Response { trls.insert("grpc-status", (code as u8).to_string().parse().unwrap()); Ok(Some(trls)) }))) - .unwrap() + .unwrap()) } async fn assert_rsp( @@ -452,14 +452,9 @@ async fn assert_rsp( assert_eq!(body, expected_body, "expected body to be {expected_body:?}"); } -async fn serve_req(handle: &mut tower_test::mock::Handle, rsp: Response) { - serve_delayed(Duration::ZERO, handle, Ok(rsp)).await; -} - -async fn serve_delayed( - delay: Duration, +async fn serve( handle: &mut tower_test::mock::Handle, - rsp: Result, + call: impl Future> + Send + 'static, ) { let (mut req, tx) = handle .next_request() @@ -472,24 +467,20 @@ async fn serve_delayed( while let Some(res) = req.body_mut().data().await { res.expect("request body must not error"); } - } - if !req.body().is_end_stream() { - req.body_mut() - .trailers() - .await - .expect("request body must not error"); + if !req.body().is_end_stream() { + req.body_mut() + .trailers() + .await + .expect("request body must not error"); + } } drop(req); tokio::spawn( async move { - if delay > Duration::ZERO { - tracing::debug!(?delay, "Sleeping"); - tokio::time::sleep(delay).await; - } - - tracing::debug!(?rsp, "Sending response"); - match rsp { + let res = call.await; + tracing::debug!(?res, "Sending response"); + match res { Ok(rsp) => tx.send_response(rsp), Err(e) => tx.send_error(e), } @@ -547,3 +538,85 @@ fn default_route(backend: client_policy::Backend) -> client_policy::http::Route }], } } + +type Handle = tower_test::mock::Handle; + +fn mock_http(params: client_policy::http::RouteParams) -> (svc::BoxCloneHttp, Handle) { + let dest = "example.com:1234".parse::().unwrap(); + let backend = default_backend(&dest); + let route = mk_route(backend.clone(), params); + mock(policy::Params::Http(policy::HttpParams { + addr: dest.into(), + meta: ParentRef(client_policy::Meta::new_default("parent")), + backends: Arc::new([backend]), + routes: Arc::new([route]), + failure_accrual: client_policy::FailureAccrual::None, + })) +} + +fn mock_grpc(params: client_policy::grpc::RouteParams) -> (svc::BoxCloneHttp, Handle) { + let dest = "example.com:1234".parse::().unwrap(); + let backend = default_backend(&dest); + let route = mk_route(backend.clone(), params); + mock(policy::Params::Grpc(policy::GrpcParams { + addr: dest.into(), + meta: ParentRef(client_policy::Meta::new_default("parent")), + backends: Arc::new([backend]), + routes: Arc::new([route]), + failure_accrual: client_policy::FailureAccrual::None, + })) +} + +fn mock(params: policy::Params) -> (svc::BoxCloneHttp, Handle) { + let (inner, handle) = tower_test::mock::pair(); + + let addr = SocketAddr::new([192, 0, 2, 41].into(), 1234); + let connect = HttpConnect::default().service(addr, inner); + let resolve = support::resolver().endpoint_exists( + params.addr().name_addr().unwrap().clone(), + addr, + Default::default(), + ); + let (rt, shutdown) = runtime(); + let stack = Outbound::new(default_config(), rt, &mut Default::default()) + .with_stack(svc::ArcNewService::new(connect)) + .push_http_cached(resolve) + .into_inner(); + + let (tx, routes) = watch::channel(Routes::Policy(params)); + tokio::spawn(async move { + tx.closed().await; + drop(shutdown); + }); + + let svc = stack.new_service(Target { + num: 1, + version: http::Version::H2, + routes, + }); + + (svc, handle) +} + +fn mk_route( + backend: client_policy::Backend, + params: P, +) -> client_policy::route::Route> { + use client_policy::*; + + route::Route { + hosts: vec![], + rules: vec![route::Rule { + matches: vec![M::default()], + policy: RoutePolicy { + meta: Meta::new_default("route"), + filters: [].into(), + distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { + filters: [].into(), + backend, + }])), + params, + }, + }], + } +} diff --git a/linkerd/app/outbound/src/http/logical/tests/retries.rs b/linkerd/app/outbound/src/http/logical/tests/retries.rs index 82c45200cd..95fa5bf56e 100644 --- a/linkerd/app/outbound/src/http/logical/tests/retries.rs +++ b/linkerd/app/outbound/src/http/logical/tests/retries.rs @@ -1,23 +1,18 @@ -use super::{ - super::{policy, Outbound, ParentRef, Routes}, - default_backend, http_get, mk_grpc_rsp, mk_rsp, send_req, serve_delayed, serve_req, - HttpConnect, Request, Response, Target, -}; -use crate::test_util::*; +use super::{http_get, mk_grpc_rsp, mk_rsp, mock_grpc, mock_http, send_req, serve}; use hyper::body::HttpBody; use linkerd_app_core::{ errors, proxy::http::{self, StatusCode}, - svc::{self, http::stream_timeouts::StreamDeadlineError, NewService}, - trace, NameAddr, + svc::http::stream_timeouts::StreamDeadlineError, + trace, }; use linkerd_proxy_client_policy::{ self as client_policy, grpc::{Codes, RouteParams as GrpcParams}, http::{RouteParams as HttpParams, Timeouts}, }; -use std::{collections::BTreeSet, net::SocketAddr, sync::Arc, time::Duration}; -use tokio::{sync::watch, time}; +use std::collections::BTreeSet; +use tokio::time; use tonic::Code; use tracing::{info, Instrument}; @@ -25,7 +20,7 @@ use tracing::{info, Instrument}; async fn http_5xx() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); + const TIMEOUT: time::Duration = time::Duration::from_secs(2); let (svc, mut handle) = mock_http(HttpParams { timeouts: Timeouts { request: Some(TIMEOUT), @@ -38,33 +33,32 @@ async fn http_5xx() { timeout: None, backoff: None, }), + ..Default::default() }); - info!("Sending a request that will initially fail and then succeed"); tokio::spawn( async move { handle.allow(2); - info!("Failing the first request"); - serve_req(&mut handle, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "")).await; - info!("Serving the second request"); - serve_req(&mut handle, mk_rsp(StatusCode::NO_CONTENT, "")).await; + serve(&mut handle, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "")).await; + serve(&mut handle, mk_rsp(StatusCode::NO_CONTENT, "")).await; handle } .in_current_span(), ); - info!("Verifying that we see the successful response"); + info!("Sending a request that will initially fail and then succeed"); let rsp = time::timeout(TIMEOUT, send_req(svc.clone(), http_get())) .await .expect("response"); + info!("Verifying that we see the successful response"); assert_eq!(rsp.expect("response").status(), StatusCode::NO_CONTENT); } #[tokio::test(flavor = "current_thread", start_paused = true)] -async fn http_5xx_limits() { +async fn http_5xx_limited() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); + const TIMEOUT: time::Duration = time::Duration::from_secs(2); let (svc, mut handle) = mock_http(HttpParams { timeouts: Timeouts { request: Some(TIMEOUT), @@ -77,20 +71,33 @@ async fn http_5xx_limits() { timeout: None, backoff: None, }), + ..Default::default() }); info!("Sending a request that will initially fail and then succeed"); tokio::spawn( async move { handle.allow(3); - info!("Failing the first request"); - serve_req(&mut handle, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "")).await; - info!("Failing the second request"); - serve_req(&mut handle, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "")).await; - info!("Failing the third request"); - serve_req(&mut handle, mk_rsp(StatusCode::GATEWAY_TIMEOUT, "")).await; + serve(&mut handle, async move { + info!("Failing the first request"); + mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "").await + }) + .await; + serve(&mut handle, async move { + info!("Failing the second request"); + mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "").await + }) + .await; + serve(&mut handle, async move { + info!("Failing the third request"); + mk_rsp(StatusCode::GATEWAY_TIMEOUT, "").await + }) + .await; info!("Prepping the fourth request (shouldn't be served)"); - serve_req(&mut handle, mk_rsp(StatusCode::NO_CONTENT, "")).await; + serve(&mut handle, async move { + mk_rsp(StatusCode::NO_CONTENT, "").await + }) + .await; handle } .in_current_span(), @@ -107,7 +114,7 @@ async fn http_5xx_limits() { async fn http_timeout() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); + const TIMEOUT: time::Duration = time::Duration::from_secs(2); let (svc, mut handle) = mock_http(HttpParams { timeouts: Timeouts { request: Some(TIMEOUT), @@ -120,6 +127,7 @@ async fn http_timeout() { timeout: Some(TIMEOUT / 4), backoff: None, }), + ..Default::default() }); info!("Sending a request that will initially timeout and then succeed"); @@ -127,16 +135,18 @@ async fn http_timeout() { async move { handle.allow(2); - info!("Delaying the first request"); - serve_delayed( - TIMEOUT / 2, - &mut handle, - Ok(mk_rsp(StatusCode::NOT_FOUND, "")), - ) + serve(&mut handle, async move { + info!("Delaying the first request"); + time::sleep(TIMEOUT / 2).await; + mk_rsp(StatusCode::NOT_FOUND, "").await + }) .await; - info!("Serving the second request"); - serve_req(&mut handle, mk_rsp(StatusCode::NO_CONTENT, "")).await; + serve(&mut handle, async move { + info!("Serving the second request"); + mk_rsp(StatusCode::NO_CONTENT, "").await + }) + .await; handle } @@ -154,7 +164,7 @@ async fn http_timeout() { async fn http_timeout_on_limit() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); + const TIMEOUT: time::Duration = time::Duration::from_secs(2); let (svc, mut handle) = mock_http(HttpParams { timeouts: Timeouts { request: Some(TIMEOUT), @@ -167,27 +177,25 @@ async fn http_timeout_on_limit() { timeout: Some(TIMEOUT / 4), backoff: None, }), + ..Default::default() }); - info!("Testing that a retry timeout does not apply when max retries is reached"); tokio::spawn( async move { handle.allow(2); - info!("Delaying the first request"); - serve_delayed( - TIMEOUT / 3, - &mut handle, - Ok(mk_rsp(StatusCode::NOT_FOUND, "")), - ) + serve(&mut handle, async move { + info!("Delaying the first request"); + time::sleep(TIMEOUT / 3).await; + mk_rsp(StatusCode::NOT_FOUND, "").await + }) .await; - info!("Delaying the second request"); - serve_delayed( - TIMEOUT / 3, - &mut handle, - Ok(mk_rsp(StatusCode::NO_CONTENT, "")), - ) + serve(&mut handle, async move { + info!("Delaying the second request"); + time::sleep(TIMEOUT / 3).await; + mk_rsp(StatusCode::NO_CONTENT, "").await + }) .await; handle @@ -195,10 +203,12 @@ async fn http_timeout_on_limit() { .in_current_span(), ); - info!("Verifying that the initial request was retried"); + info!("Testing that a retry timeout does not apply when max retries is reached"); let rsp = time::timeout(TIMEOUT, send_req(svc.clone(), http_get())) .await .expect("response"); + + info!("Verifying that the initial request was retried"); assert_eq!(rsp.expect("response").status(), StatusCode::NO_CONTENT); } @@ -206,7 +216,7 @@ async fn http_timeout_on_limit() { async fn http_timeout_with_request_timeout() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_millis(100); + const TIMEOUT: time::Duration = time::Duration::from_millis(100); let (svc, mut handle) = mock_http(HttpParams { timeouts: Timeouts { request: Some(TIMEOUT * 5), @@ -219,6 +229,7 @@ async fn http_timeout_with_request_timeout() { timeout: Some(TIMEOUT), backoff: None, }), + ..Default::default() }); info!("Sending a request that will initially timeout and then succeed"); @@ -226,47 +237,49 @@ async fn http_timeout_with_request_timeout() { async move { handle.allow(6); - info!("Delaying the first request"); - serve_delayed( - TIMEOUT * 2, - &mut handle, - Ok(mk_rsp(StatusCode::IM_A_TEAPOT, "")), - ) + // First request. + + serve(&mut handle, async move { + info!("Delaying the first request"); + time::sleep(TIMEOUT * 2).await; + mk_rsp(StatusCode::IM_A_TEAPOT, "").await + }) .await; - info!("Delaying the second request"); - serve_delayed( - TIMEOUT * 2, - &mut handle, - Ok(mk_rsp(StatusCode::IM_A_TEAPOT, "")), - ) + serve(&mut handle, async move { + info!("Delaying the second request"); + time::sleep(TIMEOUT * 2).await; + mk_rsp(StatusCode::IM_A_TEAPOT, "").await + }) .await; - info!("Delaying the third request"); - serve_req(&mut handle, mk_rsp(StatusCode::NO_CONTENT, "")).await; + serve(&mut handle, async move { + info!("Delaying the third request"); + mk_rsp(StatusCode::NO_CONTENT, "").await + }) + .await; - info!("Delaying the fourth request"); - serve_delayed( - TIMEOUT * 2, - &mut handle, - Ok(mk_rsp(StatusCode::IM_A_TEAPOT, "")), - ) + // Second request + + serve(&mut handle, async move { + info!("Delaying the fourth request"); + time::sleep(TIMEOUT * 2).await; + mk_rsp(StatusCode::IM_A_TEAPOT, "").await + }) .await; - info!("Delaying the fifth request"); - serve_delayed( - TIMEOUT * 2, - &mut handle, - Ok(mk_rsp(StatusCode::IM_A_TEAPOT, "")), - ) + serve(&mut handle, async move { + info!("Delaying the fifth request"); + time::sleep(TIMEOUT * 2).await; + mk_rsp(StatusCode::IM_A_TEAPOT, "").await + }) .await; - info!("Delaying the sixth request"); - serve_delayed( - TIMEOUT * 5, - &mut handle, - Ok(mk_rsp(StatusCode::NO_CONTENT, "")), - ) + serve(&mut handle, async move { + info!("Delaying the sixth request"); + time::sleep(TIMEOUT * 5).await; + mk_rsp(StatusCode::NO_CONTENT, "").await + }) .await; handle @@ -293,7 +306,7 @@ async fn http_timeout_with_request_timeout() { async fn grpc_internal() { let _trace = trace::test::with_default_filter("linkerd=debug"); - const TIMEOUT: Duration = Duration::from_millis(100); + const TIMEOUT: time::Duration = time::Duration::from_millis(100); let (svc, mut handle) = mock_grpc(GrpcParams { timeouts: Timeouts { request: Some(TIMEOUT), @@ -311,6 +324,7 @@ async fn grpc_internal() { timeout: None, backoff: None, }), + ..Default::default() }); info!("Sending a request that will initially fail and then succeed"); @@ -318,9 +332,9 @@ async fn grpc_internal() { async move { handle.allow(2); info!("Failing the first request"); - serve_req(&mut handle, mk_grpc_rsp(tonic::Code::Internal)).await; + serve(&mut handle, mk_grpc_rsp(tonic::Code::Internal)).await; info!("Serving the second request"); - serve_req(&mut handle, mk_grpc_rsp(tonic::Code::Ok)).await; + serve(&mut handle, mk_grpc_rsp(tonic::Code::Ok)).await; handle } .in_current_span(), @@ -358,7 +372,7 @@ async fn grpc_internal() { async fn grpc_timeout() { let _trace = trace::test::with_default_filter("linkerd=debug"); - const TIMEOUT: Duration = Duration::from_millis(100); + const TIMEOUT: time::Duration = time::Duration::from_millis(100); let (svc, mut handle) = mock_grpc(GrpcParams { timeouts: Timeouts { request: Some(TIMEOUT * 5), @@ -371,6 +385,7 @@ async fn grpc_timeout() { max_request_bytes: 1000, backoff: None, }), + ..Default::default() }); info!("Sending a request that will initially fail and then succeed"); @@ -378,14 +393,13 @@ async fn grpc_timeout() { async move { handle.allow(2); info!("Delaying the first request"); - serve_delayed( - TIMEOUT * 2, - &mut handle, - Ok(mk_grpc_rsp(tonic::Code::NotFound)), - ) + serve(&mut handle, async move { + time::sleep(TIMEOUT * 2).await; + mk_grpc_rsp(tonic::Code::NotFound).await + }) .await; info!("Serving the second request"); - serve_req(&mut handle, mk_grpc_rsp(tonic::Code::Ok)).await; + serve(&mut handle, mk_grpc_rsp(tonic::Code::Ok)).await; handle } .in_current_span(), @@ -418,88 +432,3 @@ async fn grpc_timeout() { "0" ); } - -// === Utils === - -type Handle = tower_test::mock::Handle; - -fn mock_http(params: HttpParams) -> (svc::BoxCloneHttp, Handle) { - let dest: NameAddr = "example.com:1234".parse::().unwrap(); - let backend = default_backend(&dest); - let route = mk_route(backend.clone(), params); - mock( - dest.clone(), - policy::Params::Http(policy::HttpParams { - addr: dest.into(), - meta: ParentRef(client_policy::Meta::new_default("parent")), - backends: Arc::new([backend]), - routes: Arc::new([route]), - failure_accrual: client_policy::FailureAccrual::None, - }), - ) -} - -fn mock_grpc(params: GrpcParams) -> (svc::BoxCloneHttp, Handle) { - let dest: NameAddr = "example.com:1234".parse::().unwrap(); - let backend = default_backend(&dest); - let route = mk_route(backend.clone(), params); - mock( - dest.clone(), - policy::Params::Grpc(policy::GrpcParams { - addr: dest.into(), - meta: ParentRef(client_policy::Meta::new_default("parent")), - backends: Arc::new([backend]), - routes: Arc::new([route]), - failure_accrual: client_policy::FailureAccrual::None, - }), - ) -} - -fn mock(dest: NameAddr, params: policy::Params) -> (svc::BoxCloneHttp, Handle) { - let addr = SocketAddr::new([192, 0, 2, 41].into(), 1234); - - let (inner, handle) = tower_test::mock::pair(); - let connect = HttpConnect::default().service(addr, inner); - let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default()); - let (rt, _shutdown) = runtime(); - let stack = Outbound::new(default_config(), rt, &mut Default::default()) - .with_stack(svc::ArcNewService::new(connect)) - .push_http_cached(resolve) - .into_inner(); - - let (tx, routes) = watch::channel(Routes::Policy(params)); - tokio::spawn(async move { - tx.closed().await; - }); - - let svc = stack.new_service(Target { - num: 1, - version: http::Version::H2, - routes, - }); - - (svc, handle) -} - -fn mk_route( - backend: client_policy::Backend, - params: P, -) -> client_policy::route::Route> { - use client_policy::*; - - route::Route { - hosts: vec![], - rules: vec![route::Rule { - matches: vec![M::default()], - policy: RoutePolicy { - meta: Meta::new_default("route"), - filters: [].into(), - distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { - filters: [].into(), - backend, - }])), - params, - }, - }], - } -} diff --git a/linkerd/app/outbound/src/http/logical/tests/timeouts.rs b/linkerd/app/outbound/src/http/logical/tests/timeouts.rs index 40222ca596..fb4c406eac 100644 --- a/linkerd/app/outbound/src/http/logical/tests/timeouts.rs +++ b/linkerd/app/outbound/src/http/logical/tests/timeouts.rs @@ -1,42 +1,41 @@ -use super::{ - super::{policy, LogicalError, Outbound, ParentRef, Routes}, - default_backend, http_get, mk_rsp, send_req, serve_delayed, serve_req, HttpConnect, Request, - Response, Target, -}; +use super::{super::LogicalError, http_get, mock_http, send_req, serve}; use crate::test_util::*; use linkerd_app_core::{ errors, proxy::http::{ self, stream_timeouts::{BodyTimeoutError, ResponseTimeoutError}, - BoxBody, HttpBody, StatusCode, + BoxBody, HttpBody, }, - svc::{self, NewService}, - trace, NameAddr, + trace, }; use linkerd_proxy_client_policy::{self as client_policy, http::Timeouts}; -use std::{net::SocketAddr, sync::Arc, time::Duration}; -use tokio::{sync::watch, time}; -use tracing::info; +use tokio::time; +use tracing::{info, Instrument}; #[tokio::test(flavor = "current_thread", start_paused = true)] async fn request_timeout_response_headers() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - request: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + request: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); info!("Sending a request that does not respond within the timeout"); handle.allow(1); let call = send_req(svc.clone(), http_get()); - serve_delayed( - TIMEOUT * 2, - &mut handle, - Ok(mk_rsp(StatusCode::NO_CONTENT, "")), - ) + serve(&mut handle, async move { + time::sleep(TIMEOUT * 2).await; + Ok(http::Response::builder() + .status(204) + .body(http::BoxBody::default()) + .unwrap()) + }) .await; info!("Verifying that the response fails with the expected error"); @@ -61,9 +60,12 @@ async fn request_timeout_response_headers() { async fn request_timeout_request_body() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - request: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + request: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); @@ -101,23 +103,28 @@ async fn request_timeout_request_body() { async fn request_timeout_response_body() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - request: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + request: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); info!("Sending a request that responds immediately but does not complete"); handle.allow(1); let call = send_req(svc.clone(), http_get()); - serve_req( + serve( &mut handle, - http::Response::builder() - .status(200) - .body(http::BoxBody::new(MockBody::new(async move { - futures::future::pending().await - }))) - .unwrap(), + future::ok( + http::Response::builder() + .status(200) + .body(http::BoxBody::new(MockBody::new(async move { + futures::future::pending().await + }))) + .unwrap(), + ), ) .await; @@ -142,23 +149,25 @@ async fn request_timeout_response_body() { async fn response_timeout_response_headers() { let _trace = trace::test::with_default_filter("linkerd=trace"); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - response: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + response: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); info!("Sending a request that does not respond within the timeout"); handle.allow(1); let call = send_req(svc.clone(), http_get()); - serve_delayed( - TIMEOUT * 2, - &mut handle, + serve(&mut handle, async move { + time::sleep(TIMEOUT * 2).await; Ok(http::Response::builder() .status(204) .body(http::BoxBody::default()) - .unwrap()), - ) + .unwrap()) + }) .await; info!("Verifying that the response fails with the expected error"); @@ -179,28 +188,36 @@ async fn response_timeout_response_headers() { async fn response_timeout_response_body() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - response: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + response: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); + tokio::spawn( + async move { + handle.allow(1); + serve(&mut handle, async move { + info!("Serving a response that never completes"); + Ok(http::Response::builder() + .status(200) + .body(http::BoxBody::new(MockBody::new(async move { + futures::future::pending().await + }))) + .unwrap()) + }) + .await; + } + .in_current_span(), + ); + info!("Sending a request that responds immediately but does not complete"); - handle.allow(1); - let call = send_req(svc.clone(), http_get()); - serve_req( - &mut handle, - http::Response::builder() - .status(200) - .body(http::BoxBody::new(MockBody::new(async move { - futures::future::pending().await - }))) - .unwrap(), - ) - .await; + let mut rsp = send_req(svc.clone(), http_get()).await.unwrap().into_body(); info!("Verifying that the request body times out with the expected stream error"); - let mut rsp = call.await.unwrap().into_body(); let error = time::timeout(TIMEOUT * 2, rsp.data()) .await .expect("should timeout internally") @@ -220,25 +237,27 @@ async fn response_timeout_response_body() { async fn response_timeout_ignores_request_body() { let _trace = trace::test::with_default_filter("linkerd=trace"); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - response: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + response: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); info!("Sending a request that exceeds the response timeout"); handle.allow(1); let call = send_req(svc.clone(), http_get()); - serve_req( - &mut handle, - http::Response::builder() + serve(&mut handle, async move { + info!("Serving a response that never completes"); + Ok(http::Response::builder() .status(200) .body(http::BoxBody::new(MockBody::new(async move { - time::sleep(TIMEOUT * 2).await; - Ok(()) + futures::future::pending().await }))) - .unwrap(), - ) + .unwrap()) + }) .await; info!("Verifying that the response succeeds despite slow request time"); @@ -252,24 +271,27 @@ async fn response_timeout_ignores_request_body() { async fn idle_timeout_response_body() { let _trace = trace::test::trace_init(); - const TIMEOUT: Duration = Duration::from_secs(2); - let (svc, mut handle) = mock(Timeouts { - idle: Some(TIMEOUT), + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(client_policy::http::RouteParams { + timeouts: Timeouts { + idle: Some(TIMEOUT), + ..Default::default() + }, ..Default::default() }); info!("Sending a request that is served immediately with a body that does not update"); handle.allow(1); let call = send_req(svc.clone(), http_get()); - serve_req( - &mut handle, - http::Response::builder() + serve(&mut handle, async move { + info!("Serving a response that never completes"); + Ok(http::Response::builder() .status(200) .body(http::BoxBody::new(MockBody::new(async move { futures::future::pending().await }))) - .unwrap(), - ) + .unwrap()) + }) .await; info!("Verifying that the request body times out with the expected stream error"); @@ -288,69 +310,3 @@ async fn idle_timeout_response_body() { "expected idle timeout, got {error:?}" ); } - -// === Utils === - -type Handle = tower_test::mock::Handle; - -fn mock(timeouts: Timeouts) -> (svc::BoxCloneHttp, Handle) { - let addr = SocketAddr::new([192, 0, 2, 41].into(), 1234); - let dest: NameAddr = "example.com:1234".parse::().unwrap(); - - let (inner, handle) = tower_test::mock::pair(); - let connect = HttpConnect::default().service(addr, inner); - let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default()); - let (rt, _shutdown) = runtime(); - let stack = Outbound::new(default_config(), rt, &mut Default::default()) - .with_stack(svc::ArcNewService::new(connect)) - .push_http_cached(resolve) - .into_inner(); - - let (tx, routes) = { - let backend = default_backend(&dest); - let route = mk_route(backend.clone(), timeouts); - watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { - addr: dest.into(), - meta: ParentRef(client_policy::Meta::new_default("parent")), - backends: Arc::new([backend]), - routes: Arc::new([route]), - failure_accrual: client_policy::FailureAccrual::None, - }))) - }; - tokio::spawn(async move { - tx.closed().await; - }); - - let svc = stack.new_service(Target { - num: 1, - version: http::Version::H2, - routes, - }); - - (svc, handle) -} - -fn mk_route(backend: client_policy::Backend, timeouts: Timeouts) -> client_policy::http::Route { - use client_policy::{ - http::{self, Policy, Route, Rule}, - Meta, RouteBackend, RouteDistribution, - }; - Route { - hosts: vec![], - rules: vec![Rule { - matches: vec![http::r#match::MatchRequest::default()], - policy: Policy { - meta: Meta::new_default("timeout-route"), - filters: [].into(), - params: http::RouteParams { - timeouts, - ..Default::default() - }, - distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { - filters: [].into(), - backend, - }])), - }, - }], - } -}