From 675fafb68d5a128010f23ce12ba03fbe465a2ef4 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 1 Dec 2023 04:13:56 +0000 Subject: [PATCH] balance: Add a p2c Pool implementation Following #2540, which introduces a new PoolQueue and Pool interface, this change introduces a P2cPool implementation that replaces Tower's p2c balancer (using the same underlying ReadyCache and p2c implementations). This balancer implementation is currently unused. It will be integrated in a follow-up change. --- Cargo.lock | 4 + linkerd/proxy/balance/Cargo.toml | 8 +- linkerd/proxy/balance/src/lib.rs | 2 + linkerd/proxy/balance/src/pool.rs | 4 + linkerd/proxy/balance/src/pool/p2c.rs | 351 ++++++++++++++++++++++++++ 5 files changed, 368 insertions(+), 1 deletion(-) create mode 100644 linkerd/proxy/balance/src/pool.rs create mode 100644 linkerd/proxy/balance/src/pool/p2c.rs diff --git a/Cargo.lock b/Cargo.lock index 833af50ffd..561f3bfe1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1680,13 +1680,16 @@ dependencies = [ name = "linkerd-proxy-balance" version = "0.1.0" dependencies = [ + "ahash", "futures", "futures-util", "indexmap", "linkerd-error", "linkerd-metrics", "linkerd-proxy-core", + "linkerd-proxy-pool", "linkerd-stack", + "parking_lot", "pin-project", "rand", "thiserror", @@ -1694,6 +1697,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tower", + "tower-test", "tracing", ] diff --git a/linkerd/proxy/balance/Cargo.toml b/linkerd/proxy/balance/Cargo.toml index 2fbc639d1c..a265266224 100644 --- a/linkerd/proxy/balance/Cargo.toml +++ b/linkerd/proxy/balance/Cargo.toml @@ -6,13 +6,16 @@ license = "Apache-2.0" publish = false [dependencies] +ahash = "0.8" futures = { version = "0.3", default-features = false } futures-util = "0.3" indexmap = "1" linkerd-error = { path = "../../error" } linkerd-metrics = { path = "../../metrics" } linkerd-proxy-core = { path = "../core" } +linkerd-proxy-pool = { path = "../pool" } linkerd-stack = { path = "../../stack" } +parking_lot = "0.12" pin-project = "1" rand = "0.8" thiserror = "1" @@ -24,4 +27,7 @@ tracing = "0.1" [dependencies.tower] version = "0.4.13" default-features = false -features = ["balance", "discover", "load"] +features = ["balance", "load", "ready-cache"] + +[dev-dependencies] +tower-test = "0.4" diff --git a/linkerd/proxy/balance/src/lib.rs b/linkerd/proxy/balance/src/lib.rs index eb9cda02c0..02f10ca649 100644 --- a/linkerd/proxy/balance/src/lib.rs +++ b/linkerd/proxy/balance/src/lib.rs @@ -11,10 +11,12 @@ use tower::{ mod discover; mod gauge_endpoints; +mod pool; pub use self::{ discover::DiscoveryStreamOverflow, gauge_endpoints::{EndpointsGauges, NewGaugeEndpoints}, + pool::P2cPool, }; pub use tower::load::peak_ewma::Handle; diff --git a/linkerd/proxy/balance/src/pool.rs b/linkerd/proxy/balance/src/pool.rs new file mode 100644 index 0000000000..04fd0ab2c2 --- /dev/null +++ b/linkerd/proxy/balance/src/pool.rs @@ -0,0 +1,4 @@ +mod p2c; + +pub use self::p2c::P2cPool; +pub use linkerd_proxy_pool::{Pool, Update}; diff --git a/linkerd/proxy/balance/src/pool/p2c.rs b/linkerd/proxy/balance/src/pool/p2c.rs new file mode 100644 index 0000000000..d83e19c0d5 --- /dev/null +++ b/linkerd/proxy/balance/src/pool/p2c.rs @@ -0,0 +1,351 @@ +//! A pool that uses the power-of-two-choices algorithm to select endpoints. +//! +// Based on tower::p2c::Balance. Copyright (c) 2019 Tower Contributors + +use super::{Pool, Update}; +use ahash::AHashMap; +use futures_util::TryFutureExt; +use linkerd_error::Error; +use linkerd_stack::{NewService, Service}; +use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng}; +use std::{ + collections::hash_map::Entry, + net::SocketAddr, + task::{Context, Poll}, +}; +use tower::{ + load::Load, + ready_cache::{error::Failed, ReadyCache}, +}; + +/// Dispatches requests to a pool of services selected by the +/// power-of-two-choices algorithm. +#[derive(Debug)] +pub struct P2cPool { + new_endpoint: N, + endpoints: AHashMap, + pool: ReadyCache, + rng: SmallRng, + next_idx: Option, +} + +impl P2cPool +where + T: Clone + Eq, + N: NewService<(SocketAddr, T), Service = S>, + S: Service + Load, + S::Error: Into, + S::Metric: std::fmt::Debug, +{ + pub fn new(new_endpoint: N) -> Self { + let rng = SmallRng::from_rng(&mut thread_rng()).expect("RNG must be seeded"); + Self { + rng, + new_endpoint, + next_idx: None, + pool: ReadyCache::default(), + endpoints: Default::default(), + } + } + + /// Resets the pool to include the given targets without unnecessarily + /// rebuilding inner services. + /// + /// Returns true if the pool was changed. + fn reset(&mut self, targets: Vec<(SocketAddr, T)>) -> bool { + let mut changed = false; + let mut remaining = std::mem::take(&mut self.endpoints); + for (addr, target) in targets.into_iter() { + remaining.remove(&addr); + match self.endpoints.entry(addr) { + Entry::Occupied(mut e) => { + if e.get() == &target { + continue; + } + e.insert(target.clone()); + } + Entry::Vacant(e) => { + e.insert(target.clone()); + } + } + let svc = self.new_endpoint.new_service((addr, target)); + self.pool.push(addr, svc); + changed = true; + } + for (addr, _) in remaining.drain() { + changed = self.pool.evict(&addr) || changed; + } + changed + } + + /// Adds endpoints to the pool without unnecessarily rebuilding inner + /// services. + /// + /// Returns true if the pool was changed. + fn add(&mut self, targets: Vec<(SocketAddr, T)>) -> bool { + let mut changed = false; + for (addr, target) in targets.into_iter() { + match self.endpoints.entry(addr) { + Entry::Occupied(mut e) => { + if e.get() == &target { + continue; + } + e.insert(target.clone()); + } + Entry::Vacant(e) => { + e.insert(target.clone()); + } + } + let svc = self.new_endpoint.new_service((addr, target)); + self.pool.push(addr, svc); + changed = true; + } + changed + } + + /// Removes endpoint services. + /// + /// Returns true if the pool was changed. + fn remove(&mut self, addrs: Vec) -> bool { + let mut changed = false; + for addr in addrs.into_iter() { + if self.endpoints.remove(&addr).is_some() { + changed = self.pool.evict(&addr) || changed; + } + } + changed + } + + /// Clear all endpoints from the pool. + /// + /// Returns true if the pool was changed. + fn clear(&mut self) -> bool { + let mut changed = false; + for (addr, _) in self.endpoints.drain() { + changed = self.pool.evict(&addr) || changed; + } + changed + } + + fn p2c_ready_index(&mut self) -> Option { + match self.pool.ready_len() { + 0 => None, + 1 => Some(0), + len => { + // Get two distinct random indexes (in a random order) and + // compare the loads of the service at each index. + let aidx = self.rng.gen_range(0..len); + let mut bidx = self.rng.gen_range(0..(len - 1)); + if bidx >= aidx { + bidx += 1; + } + debug_assert_ne!(aidx, bidx, "random indices must be distinct"); + + let aload = self.ready_index_load(aidx); + let bload = self.ready_index_load(bidx); + let chosen = if aload <= bload { aidx } else { bidx }; + + tracing::trace!( + a.index = aidx, + a.load = ?aload, + b.index = bidx, + b.load = ?bload, + chosen = if chosen == aidx { "a" } else { "b" }, + "p2c", + ); + Some(chosen) + } + } + } + + /// Accesses a ready endpoint by index and returns its current load. + fn ready_index_load(&self, index: usize) -> S::Metric { + let (_, svc) = self.pool.get_ready_index(index).expect("invalid index"); + svc.load() + } +} + +impl Pool for P2cPool +where + T: Clone + Eq + std::fmt::Debug, + N: NewService<(SocketAddr, T), Service = S>, + S: Service + Load, + S::Error: Into, + S::Future: Send + 'static, + S::Metric: std::fmt::Debug, +{ + fn update_pool(&mut self, update: Update) { + tracing::trace!(?update); + let changed = match update { + Update::Reset(targets) => self.reset(targets), + Update::Add(targets) => self.add(targets), + Update::Remove(addrs) => self.remove(addrs), + Update::DoesNotExist => self.clear(), + }; + if changed { + self.next_idx = None; + } + } + + /// Moves pending endpoints to ready. + /// + /// This must be called from the same task that invokes Service::poll_ready. + fn poll_pool( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + tracing::trace!("Polling pending"); + self.pool.poll_pending(cx).map_err(|Failed(_, e)| e) + } +} + +impl Service for P2cPool +where + T: Clone + Eq, + N: NewService<(SocketAddr, T), Service = S>, + S: Service + Load, + S::Error: Into, + S::Future: Send + 'static, + S::Metric: std::fmt::Debug, +{ + type Response = S::Response; + type Error = Error; + type Future = futures::future::ErrInto; + + /// Returns ready when at least one endpoint is ready. + /// + /// If multiple endpoints are ready, the power-of-two-choices algorithm is + /// used to select one. + /// + /// NOTE that this may return `Pending` when there are no endpoints. In such + /// cases, the caller must invoke `update_pool` and then wait for new + /// endpoints to become ready. + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + tracing::trace!(pending = self.pool.pending_len(), "Polling pending"); + match self.pool.poll_pending(cx)? { + Poll::Ready(()) => tracing::trace!("All endpoints are ready"), + Poll::Pending => tracing::trace!("Endpoints are pending"), + } + + let idx = match self.next_idx.take().or_else(|| self.p2c_ready_index()) { + Some(idx) => idx, + None => { + tracing::debug!("No ready endpoints"); + return Poll::Pending; + } + }; + + tracing::trace!(ready.index = idx, "Selected"); + if !self.pool.check_ready_index(cx, idx)? { + tracing::trace!(ready.index = idx, "Reverted to pending"); + continue; + } + + tracing::trace!(ready.index = idx, "Ready"); + self.next_idx = Some(idx); + return Poll::Ready(Ok(())); + } + } + + fn call(&mut self, req: Req) -> Self::Future { + let idx = self.next_idx.take().expect("call before ready"); + self.pool.call_ready_index(idx, req).err_into() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::prelude::*; + use linkerd_stack::ServiceExt; + use tokio::time; + use tower::load::{CompleteOnResponse, PeakEwma}; + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn update_pool() { + let addr0 = "192.168.10.10:80".parse().unwrap(); + let addr1 = "192.168.10.11:80".parse().unwrap(); + + let mut pool = P2cPool::new(|(_addr, _n)| { + PeakEwma::new( + linkerd_stack::service_fn(|()| { + std::future::ready(Ok::<_, std::convert::Infallible>(())) + }), + time::Duration::from_secs(1), + 1.0 * 1000.0 * 1000.0, + CompleteOnResponse::default(), + ) + }); + + pool.update_pool(Update::Reset(vec![(addr0, 0)])); + assert_eq!(pool.endpoints.len(), 1); + assert_eq!(pool.endpoints.get(&addr0), Some(&0)); + + pool.update_pool(Update::Add(vec![(addr0, 1)])); + assert_eq!(pool.endpoints.len(), 1); + assert_eq!(pool.endpoints.get(&addr0), Some(&1)); + + pool.update_pool(Update::Add(vec![(addr1, 1)])); + assert_eq!(pool.endpoints.len(), 2); + assert_eq!(pool.endpoints.get(&addr1), Some(&1)); + + pool.update_pool(Update::Remove(vec![addr0])); + assert_eq!(pool.endpoints.len(), 1); + + pool.update_pool(Update::Reset(vec![(addr0, 2), (addr1, 2)])); + assert_eq!(pool.endpoints.len(), 2); + assert_eq!(pool.endpoints.get(&addr0), Some(&2)); + assert_eq!(pool.endpoints.get(&addr1), Some(&2)); + + pool.update_pool(Update::DoesNotExist); + assert_eq!(pool.endpoints.len(), 0); + } + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn p2c_ready_index() { + let addr0 = "192.168.10.10:80".parse().unwrap(); + let (svc0, mut h0) = tower_test::mock::pair::<(), ()>(); + h0.allow(0); + + let addr1 = "192.168.10.11:80".parse().unwrap(); + let (svc1, mut h1) = tower_test::mock::pair::<(), ()>(); + h1.allow(0); + + let addr2 = "192.168.10.12:80".parse().unwrap(); + let (svc2, mut h2) = tower_test::mock::pair::<(), ()>(); + h2.allow(0); + + let mut pool = P2cPool::new(|(a, ())| { + PeakEwma::new( + if a == addr0 { + svc0.clone() + } else if a == addr1 { + svc1.clone() + } else if a == addr2 { + svc2.clone() + } else { + panic!("unexpected address: {a}"); + }, + time::Duration::from_secs(1), + 1.0 * 1000.0 * 1000.0, + CompleteOnResponse::default(), + ) + }); + + pool.update_pool(Update::Reset(vec![(addr0, ())])); + assert!(pool.ready().now_or_never().is_none()); + assert!(pool.next_idx.is_none()); + + h0.allow(1); + assert!(pool.ready().now_or_never().is_some()); + assert_eq!(pool.next_idx, Some(0)); + + h1.allow(1); + h2.allow(1); + pool.update_pool(Update::Reset(vec![(addr0, ()), (addr1, ()), (addr2, ())])); + assert!(pool.next_idx.is_none()); + assert!(pool.ready().now_or_never().is_some()); + assert!(pool.next_idx.is_some()); + } +}