diff --git a/linkerd/app/outbound/src/http/concrete/tests.rs b/linkerd/app/outbound/src/http/concrete/tests.rs index 42bf6b11a4..73446b5434 100644 --- a/linkerd/app/outbound/src/http/concrete/tests.rs +++ b/linkerd/app/outbound/src/http/concrete/tests.rs @@ -7,7 +7,7 @@ use linkerd_app_core::{ }; use linkerd_proxy_client_policy as policy; use std::{net::SocketAddr, num::NonZeroU16, sync::Arc}; -use tokio::time; +use tokio::{task, time}; #[tokio::test(flavor = "current_thread")] async fn gauges_endpoints() { @@ -49,6 +49,12 @@ async fn gauges_endpoints() { }, }); + // Run a background task that drives requests through the balancer as it is notified. + // + // XXX(ver) Discovery updates are only processed when the buffer is actively + // processing requests, so we need to drive requests through this test to + // update the gauge metrics. If the discovery processing logic changes, we + // can update this to test updates without processing requests. let ready = Arc::new(tokio::sync::Notify::new()); let _task = tokio::spawn({ let ready = ready.clone(); @@ -66,17 +72,23 @@ async fn gauges_endpoints() { .metrics .http_balancer .http_endpoints(svc::Param::param(&Target), svc::Param::param(&Target)); - assert_eq!(gauge.pending.value(), 0); - assert_eq!(gauge.ready.value(), 0); + assert_eq!( + (gauge.pending.value(), gauge.ready.value()), + (0, 0), + "No endpoints" + ); // Begin with a single endpoint. When the balancer can process requests, the // gauge is accurate. resolve_tx.add(vec![(ep0, Metadata::default())]).unwrap(); handle0.allow(1); ready.notify_one(); - tokio::task::yield_now().await; - assert_eq!(gauge.pending.value(), 0); - assert_eq!(gauge.ready.value(), 1); + task::yield_now().await; + assert_eq!( + (gauge.pending.value(), gauge.ready.value()), + (0, 1), + "After adding an endpoint one should be ready" + ); let (_, res) = handle0.next_request().await.unwrap(); res.send_response(http::Response::default()); @@ -85,9 +97,12 @@ async fn gauges_endpoints() { handle0.allow(0); handle1.allow(1); ready.notify_one(); - tokio::task::yield_now().await; - assert_eq!(gauge.pending.value(), 1); - assert_eq!(gauge.ready.value(), 1); + task::yield_now().await; + assert_eq!( + (gauge.pending.value(), gauge.ready.value()), + (1, 1), + "Added a pending endpoint" + ); let (_, res) = handle1.next_request().await.unwrap(); res.send_response(http::Response::default()); @@ -100,18 +115,24 @@ async fn gauges_endpoints() { // The inner endpoint isn't actually dropped until the balancer's subsequent poll. ready.notify_one(); - tokio::task::yield_now().await; - assert_eq!(gauge.pending.value(), 0); - assert_eq!(gauge.ready.value(), 1); + task::yield_now().await; + assert_eq!( + (gauge.pending.value(), gauge.ready.value()), + (0, 1), + "Removed an endpoint" + ); let (_, res) = handle1.next_request().await.unwrap(); res.send_response(http::Response::default()); // Dropping the remaining endpoint, the gauge is updated. resolve_tx.remove(vec![ep1]).unwrap(); ready.notify_one(); - tokio::task::yield_now().await; - assert_eq!(gauge.pending.value(), 0); - assert_eq!(gauge.ready.value(), 0); + task::yield_now().await; + assert_eq!( + (gauge.pending.value(), gauge.ready.value()), + (0, 0), + "Removed all endpoints" + ); } #[derive(Clone, Debug)]