Skip to content

Commit

Permalink
Enable PoolQueue balancer (#2559)
Browse files Browse the repository at this point in the history
This change culminates recent work to restructure the balancer to use a
PoolQueue so that balancer changes may occur independently of request
processing. This replaces independent discovery buffering so that the
balancer task is responsible for polling discovery streams without
independent buffering. Requests are buffered and processed as soon as
the pool has available backends. Fail-fast circuit breaking is enforced
on the balancer's queue so that requests can't get stuck in a queue
indefinitely.

In general, the new balancer is instrumented directly with metrics, and
the relevant metric name prefix and labelset is provided by the stack.
In addition to detailed queue metrics including request (in-queue)
latency histograms, but also failfast states, discovery updates counts,
and balancer endpoint pool sizes.
  • Loading branch information
olix0r authored Dec 13, 2023
1 parent a349575 commit 8222d6e
Show file tree
Hide file tree
Showing 21 changed files with 453 additions and 828 deletions.
55 changes: 45 additions & 10 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ impl svc::Param<Addr> for ControlAddr {
}
}

impl svc::Param<svc::queue::Capacity> for ControlAddr {
fn param(&self) -> svc::queue::Capacity {
svc::queue::Capacity(1_000)
}
}

impl svc::Param<svc::queue::Timeout> for ControlAddr {
fn param(&self) -> svc::queue::Timeout {
const FAILFAST: time::Duration = time::Duration::from_secs(30);
svc::queue::Timeout(FAILFAST)
}
}

impl svc::Param<http::balance::EwmaConfig> for ControlAddr {
fn param(&self) -> http::balance::EwmaConfig {
EWMA_CONFIG
Expand Down Expand Up @@ -76,6 +89,7 @@ impl Config {
svc::BoxCloneSyncService<http::Request<tonic::body::BoxBody>, http::Response<RspBody>>,
> {
let addr = self.addr;
tracing::trace!(%addr, "Building");

// When a DNS resolution fails, log the error and use the TTL, if there
// is one, to drive re-resolution attempts.
Expand Down Expand Up @@ -121,11 +135,7 @@ impl Config {
.lift_new()
.push(self::balance::layer(registry, dns, resolve_backoff))
.push(metrics.to_layer::<classify::Response, _, _>())
.push(classify::NewClassify::layer_default())
// This buffer allows a resolver client to be shared across stacks.
// No load shed is applied here, however, so backpressure may leak
// into the caller task.
.push(svc::NewQueue::layer_via(self.buffer));
.push(classify::NewClassify::layer_default());

balance
.push(self::add_origin::layer())
Expand Down Expand Up @@ -159,7 +169,7 @@ impl From<(&self::client::Target, Error)> for EndpointError {
/// Sets the request's URI from `Config`.
mod add_origin {
use super::ControlAddr;
use linkerd_stack::{layer, NewService};
use crate::svc::{layer, NewService};
use std::task::{Context, Poll};

pub fn layer<M>() -> impl layer::Layer<M, Service = NewAddOrigin<M>> + Clone {
Expand Down Expand Up @@ -219,26 +229,36 @@ mod balance {
use super::{client::Target, ControlAddr};
use crate::{
dns,
metrics::prom::{self, encoding::EncodeLabelSet},
proxy::{dns_resolve::DnsResolve, http, resolve::recover},
svc, tls,
};
use linkerd_metrics::prom;
use linkerd_stack::ExtractParam;
use std::net::SocketAddr;

pub fn layer<B, R: Clone, N>(
_registry: &mut prom::Registry,
registry: &mut prom::Registry,
dns: dns::Resolver,
recover: R,
) -> impl svc::Layer<
N,
Service = http::NewBalancePeakEwma<B, recover::Resolve<R, DnsResolve>, NewIntoTarget<N>>,
Service = http::NewBalancePeakEwma<
B,
Params,
recover::Resolve<R, DnsResolve>,
NewIntoTarget<N>,
>,
> {
let resolve = recover::Resolve::new(recover, DnsResolve::new(dns));
let metrics = Params(http::balance::MetricFamilies::register(registry));
svc::layer::mk(move |inner| {
http::NewBalancePeakEwma::new(NewIntoTarget { inner }, resolve.clone())
http::NewBalancePeakEwma::new(NewIntoTarget { inner }, resolve.clone(), metrics.clone())
})
}

#[derive(Clone, Debug)]
pub struct Params(http::balance::MetricFamilies<Labels>);

#[derive(Clone, Debug)]
pub struct NewIntoTarget<N> {
inner: N,
Expand All @@ -250,6 +270,11 @@ mod balance {
server_id: tls::ConditionalClientTls,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct Labels {
addr: String,
}

// === impl NewIntoTarget ===

impl<N: svc::NewService<ControlAddr>> svc::NewService<ControlAddr> for NewIntoTarget<N> {
Expand All @@ -273,6 +298,16 @@ mod balance {
.new_service(Target::new(addr, self.server_id.clone()))
}
}

// === impl Metrics ===

impl ExtractParam<http::balance::Metrics, ControlAddr> for Params {
fn extract_param(&self, tgt: &ControlAddr) -> http::balance::Metrics {
self.0.metrics(&Labels {
addr: tgt.addr.to_string(),
})
}
}
}

/// Creates a client suitable for gRPC.
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/integration/src/tests/direct.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::*;

#[tokio::test]
#[tokio::test(flavor = "current_thread")]
async fn h2_hinted() {
let _trace = trace_init();

Expand Down Expand Up @@ -44,7 +44,7 @@ async fn h2_hinted() {
/// `INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION` env var.
/// TODO(eliza): add a similar test where the policy on the opaque port is
/// discovered from the policy controller.
#[tokio::test]
#[tokio::test(flavor = "current_thread")]
async fn opaque_hinted() {
let _trace = trace_init();

Expand Down
23 changes: 20 additions & 3 deletions linkerd/app/outbound/src/http/concrete/balance.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::Endpoint;
use crate::{
http::{self, balance, breaker},
metrics::BalancerMetricsParams,
stack_labels, BackendRef, ParentRef,
};
use linkerd_app_core::{
Expand Down Expand Up @@ -57,6 +58,18 @@ impl<T> svc::Param<svc::queue::Timeout> for Balance<T> {
}
}

impl<T: svc::Param<ParentRef>> svc::Param<ParentRef> for Balance<T> {
fn param(&self) -> ParentRef {
self.parent.param()
}
}

impl<T: svc::Param<BackendRef>> svc::Param<BackendRef> for Balance<T> {
fn param(&self) -> BackendRef {
self.parent.param()
}
}

impl<T> Balance<T>
where
// Parent target.
Expand All @@ -68,7 +81,7 @@ where
pub(super) fn layer<N, NSvc, R>(
config: &crate::Config,
rt: &crate::Runtime,
_registry: &mut prom::Registry,
registry: &mut prom::Registry,
resolve: R,
) -> impl svc::Layer<N, Service = svc::ArcNewCloneHttp<Self>> + Clone
where
Expand All @@ -92,6 +105,8 @@ where
.push_map_target(|t: Self| ConcreteAddr(t.addr))
.into_inner();

let metrics_params = BalancerMetricsParams::register(registry);

svc::layer::mk(move |inner: N| {
let endpoint = svc::stack(inner)
.push_map_target({
Expand Down Expand Up @@ -140,11 +155,13 @@ where
.push(svc::ArcNewService::layer());

endpoint
.push(http::NewBalancePeakEwma::layer(resolve.clone()))
.push(http::NewBalancePeakEwma::layer(
resolve.clone(),
metrics_params.clone(),
))
.push_on_service(http::BoxResponse::layer())
.push_on_service(metrics.proxy.stack.layer(stack_labels("http", "balance")))
.push(svc::NewMapErr::layer_from_target::<BalanceError, _>())
.push(svc::NewQueue::layer())
.instrument(|t: &Self| {
let BackendRef(meta) = t.parent.param();
info_span!(
Expand Down
29 changes: 9 additions & 20 deletions linkerd/app/outbound/src/http/concrete/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::{BackendRef, ParentRef};
use crate::{metrics::ConcreteLabels, BackendRef, ParentRef};
use ahash::AHashMap;
use linkerd_app_core::{
metrics::{metrics, FmtLabels, FmtMetrics, Gauge},
svc::http::balance,
};
use parking_lot::Mutex;
use std::{fmt::Write, sync::Arc};
use std::sync::Arc;

metrics! {
outbound_http_balancer_endpoints: Gauge {
Expand All @@ -15,22 +15,19 @@ metrics! {

#[derive(Clone, Debug, Default)]
pub struct BalancerMetrics {
balancers: Arc<Mutex<AHashMap<Labels, balance::EndpointsGauges>>>,
balancers: Arc<Mutex<AHashMap<ConcreteLabels, balance::EndpointsGauges>>>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct Labels(ParentRef, BackendRef);

struct Ready<'l>(&'l Labels);
struct Pending<'l>(&'l Labels);
struct Ready<'l>(&'l ConcreteLabels);
struct Pending<'l>(&'l ConcreteLabels);

// === impl RouteBackendMetrics ===

impl BalancerMetrics {
pub(super) fn http_endpoints(&self, pr: ParentRef, br: BackendRef) -> balance::EndpointsGauges {
self.balancers
.lock()
.entry(Labels(pr, br))
.entry(ConcreteLabels(pr, br))
.or_default()
.clone()
}
Expand Down Expand Up @@ -58,17 +55,7 @@ impl FmtMetrics for BalancerMetrics {
}
}

impl FmtLabels for Labels {
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Labels(parent, backend) = self;

crate::metrics::write_service_meta_labels("parent", parent, f)?;
f.write_char(',')?;
crate::metrics::write_service_meta_labels("backend", backend, f)?;

Ok(())
}
}
// === impl Ready ===

impl<'l> FmtLabels for Ready<'l> {
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -77,6 +64,8 @@ impl<'l> FmtLabels for Ready<'l> {
}
}

// === impl Pending ===

impl<'l> FmtLabels for Pending<'l> {
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt_labels(f)?;
Expand Down
63 changes: 16 additions & 47 deletions linkerd/app/outbound/src/http/concrete/tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use super::*;
use super::{balance::*, *};
use crate::test_util::*;
use linkerd_app_core::{
svc::{http::balance::EwmaConfig, ServiceExt},
svc::{NewService, Service},
trace,
};
use linkerd_app_core::{proxy::http::balance::EwmaConfig, svc::NewService, trace};
use linkerd_proxy_client_policy as policy;
use std::{net::SocketAddr, num::NonZeroU16, sync::Arc};
use tokio::{task, time};
Expand Down Expand Up @@ -37,8 +33,8 @@ async fn gauges_endpoints() {
panic!("unexpected endpoint: {:?}", ep)
};

let mut svc = svc::stack(stk)
.push(balance::Balance::layer(
let _svc = svc::stack(stk)
.push(Balance::layer(
&outbound.config,
&outbound.runtime,
&mut Default::default(),
Expand All @@ -49,33 +45,15 @@ async fn gauges_endpoints() {
addr,
parent: Target,
queue: QueueConfig {
capacity: 10,
failfast_timeout: time::Duration::from_secs(1),
capacity: 100,
failfast_timeout: time::Duration::from_secs(3),
},
ewma: EwmaConfig {
default_rtt: time::Duration::from_millis(100),
decay: time::Duration::from_secs(10),
},
});

// Run a background task that drives requests through the balancer as it is notified.
//
// XXX(ver) Discovery updates are only processed when the buffer is actively
// processing requests, so we need to drive requests through this test to
// update the gauge metrics. If the discovery processing logic changes, we
// can update this to test updates without processing requests.
let ready = Arc::new(tokio::sync::Notify::new());
let _task = tokio::spawn({
let ready = ready.clone();
async move {
loop {
ready.notified().await;
svc.ready().await.unwrap();
svc.call(http::Request::default()).await.unwrap();
}
}
});

let gauge = outbound
.runtime
.metrics
Expand All @@ -91,51 +69,42 @@ async fn gauges_endpoints() {
// gauge is accurate.
resolve_tx.add(vec![(ep0, Metadata::default())]).unwrap();
handle0.allow(1);
ready.notify_one();
task::yield_now().await;
assert_eq!(
(gauge.pending.value(), gauge.ready.value()),
(0, 1),
"After adding an endpoint one should be ready"
);
let (_, res) = handle0.next_request().await.unwrap();
res.send_response(http::Response::default());

// Add a second endpoint and ensure the gauge is updated.
resolve_tx.add(vec![(ep1, Metadata::default())]).unwrap();
handle0.allow(0);
handle1.allow(1);
ready.notify_one();
handle1.allow(0);
task::yield_now().await;
assert_eq!(
(gauge.pending.value(), gauge.ready.value()),
(1, 1),
"Added a pending endpoint"
);
let (_, res) = handle1.next_request().await.unwrap();
res.send_response(http::Response::default());

handle1.allow(1);
task::yield_now().await;
assert_eq!(
(gauge.pending.value(), gauge.ready.value()),
(0, 2),
"Pending endpoint became ready"
);

// Remove the first endpoint.
resolve_tx.remove(vec![ep0]).unwrap();
handle1.allow(2);
ready.notify_one();
let (_, res) = handle1.next_request().await.unwrap();
res.send_response(http::Response::default());

// The inner endpoint isn't actually dropped until the balancer's subsequent poll.
ready.notify_one();
task::yield_now().await;
assert_eq!(
(gauge.pending.value(), gauge.ready.value()),
(0, 1),
"Removed an endpoint"
"Removed first endpoint"
);
let (_, res) = handle1.next_request().await.unwrap();
res.send_response(http::Response::default());

// Dropping the remaining endpoint, the gauge is updated.
resolve_tx.remove(vec![ep1]).unwrap();
ready.notify_one();
task::yield_now().await;
assert_eq!(
(gauge.pending.value(), gauge.ready.value()),
Expand Down
Loading

0 comments on commit 8222d6e

Please sign in to comment.