Skip to content

Commit

Permalink
chore(outbound): Simplify router test utilities
Browse files Browse the repository at this point in the history
In preparation for adding more tests, this commit simplifies the test setup
utilities so that they can be shared and reused across test modules.
  • Loading branch information
olix0r committed Jul 26, 2024
1 parent efe6279 commit b705d45
Show file tree
Hide file tree
Showing 3 changed files with 313 additions and 355 deletions.
147 changes: 110 additions & 37 deletions linkerd/app/outbound/src/http/logical/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn routes() {

handle.allow(1);
let rsp = send_req(svc.clone(), http_get());
serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await;
serve(&mut handle, mk_rsp(StatusCode::OK, "good")).await;
assert_eq!(
rsp.await.expect("request must succeed").status(),
http::StatusCode::OK
Expand Down Expand Up @@ -118,15 +118,15 @@ async fn consecutive_failures_accrue() {
info!("Sending good request");
handle.allow(1);
let rsp = send_req(svc.clone(), http_get());
serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await;
serve(&mut handle, mk_rsp(StatusCode::OK, "good")).await;
assert_rsp(rsp, StatusCode::OK, "good").await;

// fail 3 requests so that we hit the consecutive failures accrual limit
for i in 1..=3 {
info!("Sending bad request {i}/3");
handle.allow(1);
let rsp = send_req(svc.clone(), http_get());
serve_req(
serve(
&mut handle,
mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "bad"),
)
Expand Down Expand Up @@ -168,7 +168,7 @@ async fn consecutive_failures_accrue() {
info!("Serving response");
tokio::time::timeout(
time::Duration::from_secs(10),
serve_req(
serve(
&mut handle,
mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "bad"),
),
Expand Down Expand Up @@ -199,7 +199,7 @@ async fn consecutive_failures_accrue() {
let rsp = send_req(svc.clone(), http_get());
tokio::time::timeout(
time::Duration::from_secs(10),
serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")),
serve(&mut handle, mk_rsp(StatusCode::OK, "good")),
)
.await
.expect("no timeouts");
Expand All @@ -211,7 +211,7 @@ async fn consecutive_failures_accrue() {
let rsp = send_req(svc.clone(), http_get());
tokio::time::timeout(
time::Duration::from_secs(10),
serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")),
serve(&mut handle, mk_rsp(StatusCode::OK, "good")),
)
.await
.expect("no timeouts");
Expand Down Expand Up @@ -285,11 +285,11 @@ async fn balancer_doesnt_select_tripped_breakers() {
info!(failed);
let rsp = send_req(svc.clone(), http_get());
let (expected_status, expected_body) = tokio::select! {
_ = serve_req(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")) => {
_ = serve(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")) => {
info!("Balancer selected good endpoint");
(StatusCode::OK, "endpoint 1")
}
_ = serve_req(&mut handle2, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "endpoint 2")) => {
_ = serve(&mut handle2, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "endpoint 2")) => {
info!("Balancer selected bad endpoint");
failed += 1;
(StatusCode::INTERNAL_SERVER_ERROR, "endpoint 2")
Expand All @@ -303,15 +303,15 @@ async fn balancer_doesnt_select_tripped_breakers() {
handle2.allow(1);
let rsp = send_req(svc.clone(), http_get());
// The load balancer will select endpoint 1, because endpoint 2 isn't ready.
serve_req(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")).await;
serve(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")).await;
assert_rsp(rsp, StatusCode::OK, "endpoint 1").await;

// The load balancer should continue selecting the non-failing endpoint.
for _ in 0..8 {
handle1.allow(1);
handle2.allow(1);
let rsp = send_req(svc.clone(), http_get());
serve_req(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")).await;
serve(&mut handle1, mk_rsp(StatusCode::OK, "endpoint 1")).await;
assert_rsp(rsp, StatusCode::OK, "endpoint 1").await;
}
}
Expand Down Expand Up @@ -383,7 +383,7 @@ impl<T: svc::Param<Remote<ServerAddr>>> svc::NewService<T> for HttpConnect {
}
}

// ===
// === Utils ===

#[track_caller]
fn send_req(
Expand Down Expand Up @@ -415,15 +415,15 @@ fn send_req(
async move { rsp.await.expect("request task must not panic") }
}

fn mk_rsp(status: StatusCode, body: impl ToString) -> Response {
http::Response::builder()
async fn mk_rsp(status: StatusCode, body: impl ToString) -> Result<Response> {
Ok(http::Response::builder()
.status(status)
.body(http::BoxBody::new(body.to_string()))
.unwrap()
.unwrap())
}

fn mk_grpc_rsp(code: tonic::Code) -> Response {
http::Response::builder()
async fn mk_grpc_rsp(code: tonic::Code) -> Result<Response> {
Ok(http::Response::builder()
.version(::http::Version::HTTP_2)
.header(
"content-type",
Expand All @@ -434,7 +434,7 @@ fn mk_grpc_rsp(code: tonic::Code) -> Response {
trls.insert("grpc-status", (code as u8).to_string().parse().unwrap());
Ok(Some(trls))
})))
.unwrap()
.unwrap())
}

async fn assert_rsp<T: std::fmt::Debug>(
Expand All @@ -452,14 +452,9 @@ async fn assert_rsp<T: std::fmt::Debug>(
assert_eq!(body, expected_body, "expected body to be {expected_body:?}");
}

async fn serve_req(handle: &mut tower_test::mock::Handle<Request, Response>, rsp: Response) {
serve_delayed(Duration::ZERO, handle, Ok(rsp)).await;
}

async fn serve_delayed(
delay: Duration,
async fn serve(
handle: &mut tower_test::mock::Handle<Request, Response>,
rsp: Result<Response>,
call: impl Future<Output = Result<Response>> + Send + 'static,
) {
let (mut req, tx) = handle
.next_request()
Expand All @@ -472,24 +467,20 @@ async fn serve_delayed(
while let Some(res) = req.body_mut().data().await {
res.expect("request body must not error");
}
}
if !req.body().is_end_stream() {
req.body_mut()
.trailers()
.await
.expect("request body must not error");
if !req.body().is_end_stream() {
req.body_mut()
.trailers()
.await
.expect("request body must not error");
}
}
drop(req);

tokio::spawn(
async move {
if delay > Duration::ZERO {
tracing::debug!(?delay, "Sleeping");
tokio::time::sleep(delay).await;
}

tracing::debug!(?rsp, "Sending response");
match rsp {
let res = call.await;
tracing::debug!(?res, "Sending response");
match res {
Ok(rsp) => tx.send_response(rsp),
Err(e) => tx.send_error(e),
}
Expand Down Expand Up @@ -547,3 +538,85 @@ fn default_route(backend: client_policy::Backend) -> client_policy::http::Route
}],
}
}

type Handle = tower_test::mock::Handle<Request, Response>;

fn mock_http(params: client_policy::http::RouteParams) -> (svc::BoxCloneHttp, Handle) {
let dest = "example.com:1234".parse::<NameAddr>().unwrap();
let backend = default_backend(&dest);
let route = mk_route(backend.clone(), params);
mock(policy::Params::Http(policy::HttpParams {
addr: dest.into(),
meta: ParentRef(client_policy::Meta::new_default("parent")),
backends: Arc::new([backend]),
routes: Arc::new([route]),
failure_accrual: client_policy::FailureAccrual::None,
}))
}

fn mock_grpc(params: client_policy::grpc::RouteParams) -> (svc::BoxCloneHttp, Handle) {
let dest = "example.com:1234".parse::<NameAddr>().unwrap();
let backend = default_backend(&dest);
let route = mk_route(backend.clone(), params);
mock(policy::Params::Grpc(policy::GrpcParams {
addr: dest.into(),
meta: ParentRef(client_policy::Meta::new_default("parent")),
backends: Arc::new([backend]),
routes: Arc::new([route]),
failure_accrual: client_policy::FailureAccrual::None,
}))
}

fn mock(params: policy::Params) -> (svc::BoxCloneHttp, Handle) {
let (inner, handle) = tower_test::mock::pair();

let addr = SocketAddr::new([192, 0, 2, 41].into(), 1234);
let connect = HttpConnect::default().service(addr, inner);
let resolve = support::resolver().endpoint_exists(
params.addr().name_addr().unwrap().clone(),
addr,
Default::default(),
);
let (rt, shutdown) = runtime();
let stack = Outbound::new(default_config(), rt, &mut Default::default())
.with_stack(svc::ArcNewService::new(connect))
.push_http_cached(resolve)
.into_inner();

let (tx, routes) = watch::channel(Routes::Policy(params));
tokio::spawn(async move {
tx.closed().await;
drop(shutdown);
});

let svc = stack.new_service(Target {
num: 1,
version: http::Version::H2,
routes,
});

(svc, handle)
}

fn mk_route<M: Default, F, P>(
backend: client_policy::Backend,
params: P,
) -> client_policy::route::Route<M, client_policy::RoutePolicy<F, P>> {
use client_policy::*;

route::Route {
hosts: vec![],
rules: vec![route::Rule {
matches: vec![M::default()],
policy: RoutePolicy {
meta: Meta::new_default("route"),
filters: [].into(),
distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend {
filters: [].into(),
backend,
}])),
params,
},
}],
}
}
Loading

0 comments on commit b705d45

Please sign in to comment.