From 070931d2457a03ee1f4961b38dd6c6fa66989527 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Fri, 22 Nov 2024 16:20:49 +0100 Subject: [PATCH 01/18] feat(iroh-relay): Rate-limit client connections This has a hardcoded rate-limiter. Need to make it configurable. --- Cargo.lock | 1 + iroh-relay/Cargo.toml | 1 + iroh-relay/src/client/conn.rs | 9 +- iroh-relay/src/protos/disco.rs | 1 + iroh-relay/src/protos/relay.rs | 13 ++ iroh-relay/src/server/actor.rs | 6 +- iroh-relay/src/server/client_conn.rs | 235 ++++++++++++++++++++++++++- iroh-relay/src/server/clients.rs | 2 +- iroh-relay/src/server/http_server.rs | 51 +++--- iroh-relay/src/server/streams.rs | 3 + 10 files changed, 277 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 28c9d69eb4..a8906b50e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3249,6 +3249,7 @@ dependencies = [ "smallvec", "socket2", "stun-rs", + "testresult", "thiserror 1.0.68", "time", "tokio", diff --git a/iroh-relay/Cargo.toml b/iroh-relay/Cargo.toml index 3f24f2a717..d04da0ce8a 100644 --- a/iroh-relay/Cargo.toml +++ b/iroh-relay/Cargo.toml @@ -99,6 +99,7 @@ clap = { version = "4", features = ["derive"] } crypto_box = { version = "0.9.1", features = ["serde", "chacha20"] } proptest = "1.2.0" rand_chacha = "0.3.1" +testresult = "0.4.0" tokio = { version = "1", features = [ "io-util", "sync", diff --git a/iroh-relay/src/client/conn.rs b/iroh-relay/src/client/conn.rs index fde38b0f2c..020727743b 100644 --- a/iroh-relay/src/client/conn.rs +++ b/iroh-relay/src/client/conn.rs @@ -510,7 +510,7 @@ pub(crate) async fn send_packet + Unpin>( }; if let Some(rate_limiter) = rate_limiter { if rate_limiter.check_n(frame.len()).is_err() { - tracing::warn!("dropping send: rate limit reached"); + tracing::debug!("dropping send: rate limit reached"); return Ok(()); } } @@ -521,12 +521,7 @@ pub(crate) async fn send_packet + Unpin>( } pub(crate) struct RateLimiter { - inner: governor::RateLimiter< - governor::state::direct::NotKeyed, - governor::state::InMemoryState, - governor::clock::DefaultClock, - governor::middleware::NoOpMiddleware, - >, + inner: governor::DefaultDirectRateLimiter, } impl RateLimiter { diff --git a/iroh-relay/src/protos/disco.rs b/iroh-relay/src/protos/disco.rs index 3f54269f83..64af8187d7 100644 --- a/iroh-relay/src/protos/disco.rs +++ b/iroh-relay/src/protos/disco.rs @@ -10,6 +10,7 @@ pub(crate) const MAGIC_LEN: usize = MAGIC.as_bytes().len(); pub(crate) const KEY_LEN: usize = 32; const MESSAGE_HEADER_LEN: usize = MAGIC_LEN + KEY_LEN; + /// Reports whether p looks like it's a packet containing an encrypted disco message. pub fn looks_like_disco_wrapper(p: &[u8]) -> bool { if p.len() < MESSAGE_HEADER_LEN { diff --git a/iroh-relay/src/protos/relay.rs b/iroh-relay/src/protos/relay.rs index ef50a9bb07..6145b8d6c2 100644 --- a/iroh-relay/src/protos/relay.rs +++ b/iroh-relay/src/protos/relay.rs @@ -18,6 +18,9 @@ use tokio_util::codec::{Decoder, Encoder}; /// including its on-wire framing overhead) pub const MAX_PACKET_SIZE: usize = 64 * 1024; +/// The maximum frame size. +/// +/// This is also the minimum burst size that a rate-limiter has to accept. const MAX_FRAME_SIZE: usize = 1024 * 1024; /// The Relay magic number, sent in the FrameType::ClientInfo frame upon initial connection. @@ -200,9 +203,14 @@ pub(crate) async fn recv_client_key> + Un } } +/// The protocol for the relay server. +/// +/// This is a framed protocol, using [`tokio_util::codec`] to turn the streams of bytes into +/// [`Frame`]s. #[derive(Debug, Default, Clone)] pub(crate) struct DerpCodec; +/// The frames in the [`DerpCodec`]. #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum Frame { ClientInfo { @@ -279,6 +287,11 @@ impl Frame { } } + /// Serialized length with frame header. + pub(crate) fn len_with_header(&self) -> usize { + self.len() + HEADER_LEN + } + /// Tries to decode a frame received over websockets. /// /// Specifically, bytes received from a binary websocket message frame. diff --git a/iroh-relay/src/server/actor.rs b/iroh-relay/src/server/actor.rs index 9bd7f72d25..fb3c69aedb 100644 --- a/iroh-relay/src/server/actor.rs +++ b/iroh-relay/src/server/actor.rs @@ -183,12 +183,8 @@ impl Actor { } Message::CreateClient(client_builder) => { inc!(Metrics, accepts); - - trace!( - node_id = client_builder.node_id.fmt_short(), - "create client" - ); let node_id = client_builder.node_id; + trace!(node_id = node_id.fmt_short(), "create client"); // build and register client, starting up read & write loops for the client // connection diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index c0567910f8..35343daf87 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -1,16 +1,16 @@ //! The server-side representation of an ongoing client relaying connection. -use std::time::Duration; +use std::{future::Future, num::NonZeroU32, pin::Pin, sync::Arc, task::Poll, time::Duration}; use anyhow::{Context, Result}; use bytes::Bytes; -use futures_lite::StreamExt; -use futures_util::SinkExt; +use futures_sink::Sink; +use futures_util::{SinkExt, Stream, StreamExt}; use iroh_base::key::NodeId; use iroh_metrics::{inc, inc_by}; use tokio::sync::mpsc; use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; -use tracing::{info, trace, warn, Instrument}; +use tracing::{error, info, instrument, trace, warn, Instrument}; use crate::{ protos::{ @@ -69,6 +69,13 @@ impl ClientConn { server_channel, } = config; + // TODO make the values configurable + let bytes_per_second = NonZeroU32::new(1 << 12).expect("nonzero"); // 4 KiB / s + let burst_bytes = NonZeroU32::new(1 << 22).expect("nonzero"); // 4 MiB + let quota = governor::Quota::per_second(bytes_per_second).allow_burst(burst_bytes); + let rate_limiter = governor::RateLimiter::direct(quota); + let stream = RateLimitedRelayedStream::new(io, rate_limiter); + let done = CancellationToken::new(); let client_id = (key, conn_num); let (send_queue_s, send_queue_r) = mpsc::channel(channel_capacity); @@ -77,7 +84,7 @@ impl ClientConn { let (peer_gone_s, peer_gone_r) = mpsc::channel(channel_capacity); let actor = Actor { - stream: io, + stream, timeout: write_timeout, send_queue: send_queue_r, disco_send_queue: disco_send_queue_r, @@ -162,7 +169,7 @@ impl ClientConn { #[derive(Debug)] struct Actor { /// IO Stream to talk to the client - stream: RelayedStream, + stream: RateLimitedRelayedStream, /// Maximum time we wait to complete a write to the client timeout: Duration, /// Packets queued to send to the client @@ -317,10 +324,153 @@ impl Actor { } } +/// Rate limiter for reading from a [`RelayedStream`]. +/// +/// The writes to the sink are not rate limited. +/// +/// This potentially buffers one frame if the rate limiter does not allows this frame. +/// While the frame is buffered the undernlying stream is no longer polled. +#[derive(derive_more::Debug)] +struct RateLimitedRelayedStream { + inner: RelayedStream, + limiter: Arc, + #[debug("Option>>>")] + delay: Option + Send + Sync>>>, + buf: Option>, +} + +impl RateLimitedRelayedStream { + fn new(inner: RelayedStream, limiter: governor::DefaultDirectRateLimiter) -> Self { + Self { + inner, + limiter: Arc::new(limiter), + delay: None, + buf: None, + } + } +} + +impl Stream for RateLimitedRelayedStream { + type Item = anyhow::Result; + + #[instrument(name = "rate_limited_relayed_stream", skip_all)] + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + loop { + // If we have a delay installed, we need to await it. + if let Some(ref mut wait_fut) = self.delay { + tokio::pin!(wait_fut); + match wait_fut.poll(cx) { + Poll::Ready(_) => { + self.delay.take(); + continue; + } + Poll::Pending => return Poll::Pending, + } + } + // If we have an item buffered, check if we can yield it. + if let Some(ref item) = self.buf { + match item { + Err(_) => { + // Yielding errors is not rate-limited. + match self.buf.take() { + Some(item) => return Poll::Ready(Some(item)), + None => continue, // unreachable + } + } + Ok(frame) => { + // First we need to know how many bytes this frame consumes. + let Ok(frame_len) = TryInto::::try_into(frame.len_with_header()) + .and_then(|len| TryInto::::try_into(len)) + else { + error!("frame len not NonZeroU32, is MAX_FRAME_SIZE too large?"); + // Let this frame through anyway so to not completely break. + match self.buf.take() { + Some(item) => return Poll::Ready(Some(item)), + None => continue, // unreachable + } + }; + + // Now check the rate limiter. + match self.limiter.check_n(frame_len) { + Ok(Ok(_)) => { + // Item not rate-limited, yield it. + match self.buf.take() { + Some(frame) => return Poll::Ready(Some(frame)), + None => continue, // unreachable + } + } + Ok(Err(_until)) => { + // Item is rate-limited, install a delay future. + let limiter = self.limiter.clone(); + let fut = async move { + limiter.until_n_ready(frame_len).await.ok(); + }; + self.delay = Some(Box::pin(fut)); + continue; + } + Err(_insufficient_capacity) => { + error!("frame larger than bucket capacity, accepting frame"); + // Let this frame through since this is misconfigured. + match self.buf.take() { + Some(item) => return Poll::Ready(Some(item)), + None => continue, // unreachable + } + } + } + } + } + } + // If we have neither a delay future or a buffered item, poll for a new item. + match Pin::new(&mut self.inner).poll_next(cx) { + Poll::Ready(Some(item)) => { + self.buf = Some(item); + continue; + } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + } + } + } +} + +impl Sink for RateLimitedRelayedStream { + type Error = std::io::Error; + + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_ready(cx) + } + + fn start_send(mut self: Pin<&mut Self>, item: Frame) -> std::result::Result<(), Self::Error> { + Pin::new(&mut self.inner).start_send(item) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) + } +} + #[cfg(test)] mod tests { use anyhow::bail; + use bytes::Bytes; use iroh_base::key::SecretKey; + use testresult::TestResult; use tokio_util::codec::Framed; use super::*; @@ -340,9 +490,12 @@ mod tests { let (io, io_rw) = tokio::io::duplex(1024); let mut io_rw = Framed::new(io_rw, DerpCodec); let (server_channel_s, mut server_channel_r) = mpsc::channel(10); + let quota = governor::Quota::per_second(NonZeroU32::MAX); + let limiter = governor::RateLimiter::direct(quota); + let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)); let actor = Actor { - stream: RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)), + stream: RateLimitedRelayedStream::new(stream, limiter), timeout: Duration::from_secs(1), send_queue: send_queue_r, disco_send_queue: disco_send_queue_r, @@ -479,10 +632,13 @@ mod tests { let (io, io_rw) = tokio::io::duplex(1024); let mut io_rw = Framed::new(io_rw, DerpCodec); let (server_channel_s, mut server_channel_r) = mpsc::channel(10); + let quota = governor::Quota::per_second(NonZeroU32::MAX); + let limiter = governor::RateLimiter::direct(quota); + let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)); println!("-- create client conn"); let actor = Actor { - stream: RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)), + stream: RateLimitedRelayedStream::new(stream, limiter), timeout: Duration::from_secs(1), send_queue: send_queue_r, disco_send_queue: disco_send_queue_r, @@ -542,4 +698,67 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_rate_limit() -> TestResult { + let _logging = iroh_test::logging::setup(); + let (_send_queue_s, send_queue_r) = mpsc::channel(10); + let (_disco_send_queue_s, disco_send_queue_r) = mpsc::channel(10); + let (_peer_gone_s, peer_gone_r) = mpsc::channel(10); + + let key = SecretKey::generate().public(); + let (io, io_rw) = tokio::io::duplex(1024); + let mut io_rw = Framed::new(io_rw, DerpCodec); + let (server_channel_s, mut server_channel_r) = mpsc::channel(10); + + // We are only allowed to send 32 bytes per minute + const LIMIT: u32 = 50; + let quota = governor::Quota::per_minute(NonZeroU32::try_from(LIMIT)?); + let limiter = governor::RateLimiter::direct(quota); + let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)); + + println!("-- create client conn"); + let actor = Actor { + stream: RateLimitedRelayedStream::new(stream, limiter), + timeout: Duration::from_secs(1), + send_queue: send_queue_r, + disco_send_queue: disco_send_queue_r, + node_gone: peer_gone_r, + key, + server_channel: server_channel_s, + preferred: true, + }; + + let done = CancellationToken::new(); + let io_done = done.clone(); + + info!("-- run client conn"); + let handle = tokio::task::spawn(async move { actor.run(io_done).await }); + let _handle = AbortOnDropHandle::new(handle); + + // Prepare a packet to send. + let data = Bytes::from_static(b"hello world!"); + let target = SecretKey::generate().public(); + + // Assert the frame * 2 is over our limit. + let frame = Frame::SendPacket { + dst_key: target, + packet: data.clone(), + }; + let frame_len = frame.len_with_header(); + assert!(frame_len * 2 > LIMIT as usize); + info!("-- send packet with {frame_len} bytes"); + + // Send a packet, it should arrive. + conn::send_packet(&mut io_rw, &None, target, data.clone()).await?; + let msg = server_channel_r.recv().await.context("actor died?")?; + assert!(matches!(msg, actor::Message::SendPacket { .. })); + + // Send another packet, it should not arrive + conn::send_packet(&mut io_rw, &None, target, data).await?; + let ret = tokio::time::timeout(Duration::from_secs(1), server_channel_r.recv()).await; + assert!(ret.is_err()); + + Ok(()) + } } diff --git a/iroh-relay/src/server/clients.rs b/iroh-relay/src/server/clients.rs index 98c96b6ef9..8544069132 100644 --- a/iroh-relay/src/server/clients.rs +++ b/iroh-relay/src/server/clients.rs @@ -63,8 +63,8 @@ impl Clients { conn_num } + /// Builds the client handler and starts the read & write loops for the connection. pub async fn register(&mut self, client_config: ClientConnConfig) { - // this builds the client handler & starts the read & write loops to that client connection let key = client_config.node_id; trace!("registering client: {:?}", key); let conn_num = self.next_conn_num(); diff --git a/iroh-relay/src/server/http_server.rs b/iroh-relay/src/server/http_server.rs index 60e05a9d34..6a7e7649eb 100644 --- a/iroh-relay/src/server/http_server.rs +++ b/iroh-relay/src/server/http_server.rs @@ -270,7 +270,20 @@ impl ServerBuilder { } } +/// The hyper Service that serves the actual relay endpoints. +#[derive(Clone, Debug)] +struct RelayService(Arc); + +#[derive(Debug)] +struct Inner { + handlers: Handlers, + headers: HeaderMap, + server_channel: mpsc::Sender, + write_timeout: Duration, +} + impl RelayService { + /// Upgrades the HTTP connection to the relay protocol, runs relay client. fn call_client_conn( &self, mut req: Request, @@ -325,7 +338,7 @@ impl RelayService { None }; - debug!("upgrading protocol: {:?}", protocol); + debug!(?protocol, "upgrading connection"); // Setup a future that will eventually receive the upgraded // connection and talk a new protocol, and spawn the future @@ -338,19 +351,18 @@ impl RelayService { async move { match hyper::upgrade::on(&mut req).await { Ok(upgraded) => { - if let Err(e) = + if let Err(err) = this.0.relay_connection_handler(protocol, upgraded).await { warn!( - "upgrade to \"{}\": io error: {:?}", - e, - protocol.upgrade_header() + ?protocol, + "error accepting upgraded connection: {err:#}", ); } else { - debug!("upgrade to \"{}\" success", protocol.upgrade_header()); + debug!(?protocol, "upgraded connection completed"); }; } - Err(e) => warn!("upgrade error: {:?}", e), + Err(err) => warn!("upgrade error: {err:#}"), } } .instrument(debug_span!("handler")), @@ -383,41 +395,28 @@ impl Service> for RelayService { type Future = Pin> + Send>>; fn call(&self, req: Request) -> Self::Future { - // if the request hits the relay endpoint - // or /derp for backwards compat + // Create a client if the request hits the relay endpoint. if matches!( (req.method(), req.uri().path()), (&hyper::Method::GET, LEGACY_RELAY_PATH | RELAY_PATH) ) { let this = self.clone(); - // otherwise handle the relay connection as normal return Box::pin(async move { this.call_client_conn(req).await.map_err(Into::into) }); } + // Otherwise handle the relay connection as normal. - // check all other possible endpoints + // Check all other possible endpoints. let uri = req.uri().clone(); if let Some(res) = self.0.handlers.get(&(req.method().clone(), uri.path())) { let f = res(req, self.0.default_response()); return Box::pin(async move { f }); } - // otherwise return 404 + // Otherwise return 404 let res = self.0.not_found_fn(req, self.0.default_response()); Box::pin(async move { res }) } } -/// The hyper Service that servers the actual relay endpoints -#[derive(Clone, Debug)] -struct RelayService(Arc); - -#[derive(Debug)] -struct Inner { - handlers: Handlers, - headers: HeaderMap, - server_channel: mpsc::Sender, - write_timeout: Duration, -} - impl Inner { fn default_response(&self) -> ResponseBuilder { let mut response = Response::builder(); @@ -441,6 +440,10 @@ impl Inner { } /// The server HTTP handler to do HTTP upgrades. + /// + /// This handler runs while doing the connection upgrade handshake. Once the connection + /// is upgraded it sends the stream to the relay server which takes it over. After + /// having sent off the connection this handler returns. async fn relay_connection_handler(&self, protocol: Protocol, upgraded: Upgraded) -> Result<()> { debug!(?protocol, "relay_connection upgraded"); let (io, read_buf) = downcast_upgrade(upgraded)?; diff --git a/iroh-relay/src/server/streams.rs b/iroh-relay/src/server/streams.rs index 844f5ca92b..ac7247c67a 100644 --- a/iroh-relay/src/server/streams.rs +++ b/iroh-relay/src/server/streams.rs @@ -14,6 +14,9 @@ use tokio_util::codec::Framed; use crate::protos::relay::{DerpCodec, Frame}; +/// A Stream and Sink for [`Frame`]s connected to a single relay client. +/// +/// The stream receives message from the client while the sink sends them to the client. #[derive(Debug)] pub(crate) enum RelayedStream { Derp(Framed), From 8e669890b7a821e67b4e75acf3400ab0634cc5a3 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 25 Nov 2024 12:55:31 +0100 Subject: [PATCH 02/18] Allow configuring the rate limits --- iroh-relay/src/main.rs | 88 +++++++++++++++++----------- iroh-relay/src/server.rs | 35 ++++++++++- iroh-relay/src/server/actor.rs | 2 + iroh-relay/src/server/client_conn.rs | 12 ++-- iroh-relay/src/server/clients.rs | 6 +- iroh-relay/src/server/http_server.rs | 57 ++++++++++++------ 6 files changed, 141 insertions(+), 59 deletions(-) diff --git a/iroh-relay/src/main.rs b/iroh-relay/src/main.rs index aad6b95352..86a9fe5d0b 100644 --- a/iroh-relay/src/main.rs +++ b/iroh-relay/src/main.rs @@ -5,14 +5,15 @@ use std::{ net::{Ipv6Addr, SocketAddr}, + num::NonZeroU32, path::{Path, PathBuf}, }; -use anyhow::{anyhow, bail, Context as _, Result}; +use anyhow::{bail, Context as _, Result}; use clap::Parser; use iroh_relay::{ defaults::{DEFAULT_HTTPS_PORT, DEFAULT_HTTP_PORT, DEFAULT_METRICS_PORT, DEFAULT_STUN_PORT}, - server as relay, + server::{self as relay, ClientConnRateLimit}, }; use serde::{Deserialize, Serialize}; use tokio_rustls_acme::{caches::DirCache, AcmeConfig}; @@ -282,6 +283,29 @@ struct Limits { accept_conn_limit: Option, /// Burst limit for accepting new connection. Unlimited if not set. accept_conn_burst: Option, + /// Rate limiting configuration per client. + client: Option, +} + +/// Rate limit configuration for each connected client. +/// +/// The rate limiting uses a token-bucket style algorithm: +/// +/// - The base rate limit uses a steady-stream rate of bytes allowed. +/// - Additionally a burst quota allows sending bytes over this steady-stream rate +/// limit, as long as the maximum burst quota is not exceeded. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +struct PerClientRateLimitConfig { + /// Rate limit configuration for the incoming data from the client. + rx: Option, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +struct RateLimitConfig { + /// Maximum number of bytes per second. + bytes_per_second: Option, + /// Maximum number of bytes to read in a single burst. + max_burst_bytes: Option, } impl Config { @@ -295,10 +319,7 @@ impl Config { if config_path.exists() { Self::read_from_file(&config_path).await } else { - let config = Config::default(); - config.write_to_file(&config_path).await?; - - Ok(config) + Ok(Config::default()) } } @@ -313,24 +334,6 @@ impl Config { Ok(config) } - - /// Write the content of this configuration to the provided path. - async fn write_to_file(&self, path: impl AsRef) -> Result<()> { - let p = path - .as_ref() - .parent() - .ok_or_else(|| anyhow!("invalid config file path, no parent"))?; - // TODO: correct permissions (0777 for dir, 0600 for file) - tokio::fs::create_dir_all(p) - .await - .with_context(|| format!("unable to create config-path dir: {}", p.display()))?; - let config_ser = toml::to_string(self).context("unable to serialize configuration")?; - tokio::fs::write(path, config_ser) - .await - .context("unable to write config file")?; - - Ok(()) - } } #[tokio::main] @@ -402,17 +405,32 @@ async fn build_relay_config(cfg: Config) -> Result None, }; - let limits = relay::Limits { - accept_conn_limit: cfg - .limits - .as_ref() - .map(|l| l.accept_conn_limit) - .unwrap_or_default(), - accept_conn_burst: cfg - .limits - .as_ref() - .map(|l| l.accept_conn_burst) - .unwrap_or_default(), + let limits = match cfg.limits { + Some(ref limits) => { + let client_rx = match &limits.client { + Some(PerClientRateLimitConfig { rx: Some(rx) }) => { + let mut cfg = ClientConnRateLimit::default(); + if let Some(bps) = rx.bytes_per_second { + let v = NonZeroU32::try_from(bps) + .context("bytes_per_second must be non-zero u32")?; + cfg.bytes_per_second = v; + } + if let Some(burst) = rx.max_burst_bytes { + let v = NonZeroU32::try_from(burst) + .context("max_burst_bytes must be non-zero u32")?; + cfg.max_burst_bytes = v; + } + cfg + } + Some(PerClientRateLimitConfig { rx: None }) | None => Default::default(), + }; + relay::Limits { + accept_conn_limit: limits.accept_conn_limit, + accept_conn_burst: limits.accept_conn_burst, + client_rx, + } + } + None => Default::default(), }; let relay_config = relay::RelayConfig { http_bind_addr: cfg.http_bind_addr(), diff --git a/iroh-relay/src/server.rs b/iroh-relay/src/server.rs index 6f32da2863..8d58112be5 100644 --- a/iroh-relay/src/server.rs +++ b/iroh-relay/src/server.rs @@ -16,7 +16,7 @@ //! - HTTPS `/generate_204`: Used for net_report probes. //! - STUN: UDP port for STUN requests/responses. -use std::{fmt, future::Future, net::SocketAddr, pin::Pin, sync::Arc}; +use std::{fmt, future::Future, net::SocketAddr, num::NonZeroU32, pin::Pin, sync::Arc}; use anyhow::{anyhow, bail, Context, Result}; use futures_lite::StreamExt; @@ -140,12 +140,44 @@ pub struct TlsConfig { } /// Rate limits. +// TODO: accept_conn_limit and accept_conn_burst are not currently implemented. #[derive(Debug, Default)] pub struct Limits { /// Rate limit for accepting new connection. Unlimited if not set. pub accept_conn_limit: Option, /// Burst limit for accepting new connection. Unlimited if not set. pub accept_conn_burst: Option, + /// Rate limits for incoming traffic from a client connection. + pub client_rx: ClientConnRateLimit, +} + +/// Per-client rate limit configuration. +#[derive(Debug, Copy, Clone)] +pub struct ClientConnRateLimit { + /// Max number of bytes per second to read from the client connection. + /// + /// Defaults to 4KiB/s. + pub bytes_per_second: NonZeroU32, + /// Max number of bytes to read in a single burst. + /// + /// Defaults to 16 MiB. + pub max_burst_bytes: NonZeroU32, +} + +impl ClientConnRateLimit { + pub(super) const MAX: ClientConnRateLimit = ClientConnRateLimit { + bytes_per_second: NonZeroU32::MAX, + max_burst_bytes: NonZeroU32::MAX, + }; +} + +impl Default for ClientConnRateLimit { + fn default() -> Self { + Self { + bytes_per_second: NonZeroU32::try_from(1024 * 4).expect("nonzero"), + max_burst_bytes: NonZeroU32::try_from(1024 * 1024 * 16).expect("nonzero"), + } + } } /// TLS certificate configuration. @@ -255,6 +287,7 @@ impl Server { None => relay_config.http_bind_addr, }; let mut builder = http_server::ServerBuilder::new(relay_bind_addr) + .client_rx_ratelimit(relay_config.limits.client_rx) .headers(headers) .request_handler(Method::GET, "/", Box::new(root_handler)) .request_handler(Method::GET, "/index.html", Box::new(root_handler)) diff --git a/iroh-relay/src/server/actor.rs b/iroh-relay/src/server/actor.rs index fb3c69aedb..9d0f29867d 100644 --- a/iroh-relay/src/server/actor.rs +++ b/iroh-relay/src/server/actor.rs @@ -254,6 +254,7 @@ mod tests { server::{ client_conn::ClientConnConfig, streams::{MaybeTlsStream, RelayedStream}, + ClientConnRateLimit, }, }; @@ -268,6 +269,7 @@ mod tests { stream: RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)), write_timeout: Duration::from_secs(1), channel_capacity: 10, + rate_limit: ClientConnRateLimit::MAX, server_channel, }, Framed::new(test_io, DerpCodec), diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index 35343daf87..d52a0959a9 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -21,6 +21,7 @@ use crate::{ actor::{self, Packet}, metrics::Metrics, streams::RelayedStream, + ClientConnRateLimit, }, }; @@ -31,6 +32,7 @@ pub(super) struct ClientConnConfig { pub(super) stream: RelayedStream, pub(super) write_timeout: Duration, pub(super) channel_capacity: usize, + pub(super) rate_limit: ClientConnRateLimit, pub(super) server_channel: mpsc::Sender, } @@ -66,13 +68,13 @@ impl ClientConn { stream: io, write_timeout, channel_capacity, + rate_limit: rate_limit_config, server_channel, } = config; - // TODO make the values configurable - let bytes_per_second = NonZeroU32::new(1 << 12).expect("nonzero"); // 4 KiB / s - let burst_bytes = NonZeroU32::new(1 << 22).expect("nonzero"); // 4 MiB - let quota = governor::Quota::per_second(bytes_per_second).allow_burst(burst_bytes); + let quota = governor::Quota::per_second(rate_limit_config.bytes_per_second) + .allow_burst(rate_limit_config.max_burst_bytes); + // TODO: Allow creating this with mocked time for tests? let rate_limiter = governor::RateLimiter::direct(quota); let stream = RateLimitedRelayedStream::new(io, rate_limiter); @@ -383,7 +385,7 @@ impl Stream for RateLimitedRelayedStream { Ok(frame) => { // First we need to know how many bytes this frame consumes. let Ok(frame_len) = TryInto::::try_into(frame.len_with_header()) - .and_then(|len| TryInto::::try_into(len)) + .and_then(TryInto::::try_into) else { error!("frame len not NonZeroU32, is MAX_FRAME_SIZE too large?"); // Let this frame through anyway so to not completely break. diff --git a/iroh-relay/src/server/clients.rs b/iroh-relay/src/server/clients.rs index 8544069132..9b9f466da0 100644 --- a/iroh-relay/src/server/clients.rs +++ b/iroh-relay/src/server/clients.rs @@ -235,7 +235,10 @@ mod tests { use super::*; use crate::{ protos::relay::{recv_frame, DerpCodec, Frame, FrameType}, - server::streams::{MaybeTlsStream, RelayedStream}, + server::{ + streams::{MaybeTlsStream, RelayedStream}, + ClientConnRateLimit, + }, }; fn test_client_builder(key: NodeId) -> (ClientConnConfig, FramedRead) { @@ -247,6 +250,7 @@ mod tests { stream: RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)), write_timeout: Duration::from_secs(1), channel_capacity: 10, + rate_limit: ClientConnRateLimit::MAX, server_channel, }, FramedRead::new(test_io, DerpCodec), diff --git a/iroh-relay/src/server/http_server.rs b/iroh-relay/src/server/http_server.rs index 6a7e7649eb..503eca20e6 100644 --- a/iroh-relay/src/server/http_server.rs +++ b/iroh-relay/src/server/http_server.rs @@ -33,6 +33,7 @@ use crate::{ client_conn::ClientConnConfig, metrics::Metrics, streams::{MaybeTlsStream, RelayedStream}, + ClientConnRateLimit, }, }; @@ -73,7 +74,7 @@ fn downcast_upgrade(upgraded: Upgraded) -> Result<(MaybeTlsStream, Bytes)> { /// /// Created using [`ServerBuilder::spawn`]. #[derive(Debug)] -pub struct Server { +pub(super) struct Server { addr: SocketAddr, http_server_task: AbortOnDropHandle<()>, cancel_server_loop: CancellationToken, @@ -84,14 +85,14 @@ impl Server { /// /// The server runs in the background as several async tasks. This allows controlling /// the server, in particular it allows gracefully shutting down the server. - pub fn handle(&self) -> ServerHandle { + pub(super) fn handle(&self) -> ServerHandle { ServerHandle { cancel_token: self.cancel_server_loop.clone(), } } /// Closes the underlying relay server and the HTTP(S) server tasks. - pub fn shutdown(&self) { + pub(super) fn shutdown(&self) { self.cancel_server_loop.cancel(); } @@ -100,12 +101,12 @@ impl Server { /// This is the root of all the tasks for the server. Aborting it will abort all the /// other tasks for the server. Awaiting it will complete when all the server tasks are /// completed. - pub fn task_handle(&mut self) -> &mut AbortOnDropHandle<()> { + pub(super) fn task_handle(&mut self) -> &mut AbortOnDropHandle<()> { &mut self.http_server_task } /// Returns the local address of this server. - pub fn addr(&self) -> SocketAddr { + pub(super) fn addr(&self) -> SocketAddr { self.addr } } @@ -114,24 +115,24 @@ impl Server { /// /// This does not allow access to the task but can communicate with it. #[derive(Debug, Clone)] -pub struct ServerHandle { +pub(super) struct ServerHandle { cancel_token: CancellationToken, } impl ServerHandle { /// Gracefully shut down the server. - pub fn shutdown(&self) { + pub(super) fn shutdown(&self) { self.cancel_token.cancel() } } /// Configuration to use for the TLS connection #[derive(Debug, Clone)] -pub struct TlsConfig { +pub(super) struct TlsConfig { /// The server config - pub config: Arc, + pub(super) config: Arc, /// The kind - pub acceptor: TlsAcceptor, + pub(super) acceptor: TlsAcceptor, } /// Builder for the Relay HTTP Server. @@ -139,7 +140,7 @@ pub struct TlsConfig { /// Defaults to handling relay requests on the "/relay" (and "/derp" for backwards compatibility) endpoint. /// Other HTTP endpoints can be added using [`ServerBuilder::request_handler`]. #[derive(derive_more::Debug)] -pub struct ServerBuilder { +pub(super) struct ServerBuilder { /// The ip + port combination for this server. addr: SocketAddr, /// Optional tls configuration/TlsAcceptor combination. @@ -153,27 +154,42 @@ pub struct ServerBuilder { handlers: Handlers, /// Headers to use for HTTP responses. headers: HeaderMap, + /// Rate-limiting configuration for an individual client connection. + /// + /// Rate-limiting is enforced on received traffic from individual clients. This + /// configuration applies to a single client connection. + client_rx_ratelimit: ClientConnRateLimit, } impl ServerBuilder { /// Creates a new [ServerBuilder]. - pub fn new(addr: SocketAddr) -> Self { + pub(super) fn new(addr: SocketAddr) -> Self { Self { addr, tls_config: None, handlers: Default::default(), headers: HeaderMap::new(), + client_rx_ratelimit: ClientConnRateLimit::MAX, } } /// Serves all requests content using TLS. - pub fn tls_config(mut self, config: Option) -> Self { + pub(super) fn tls_config(mut self, config: Option) -> Self { self.tls_config = config; self } + /// Sets the per-client rate-limit configuration for incoming data. + /// + /// On each client connection the incoming data is rate-limited. By default + /// [`RateLimitConfig::MAX`] is enforced. + pub(super) fn client_rx_ratelimit(mut self, config: ClientConnRateLimit) -> Self { + self.client_rx_ratelimit = config; + self + } + /// Adds a custom handler for a specific Method & URI. - pub fn request_handler( + pub(super) fn request_handler( mut self, method: Method, uri_path: &'static str, @@ -184,7 +200,7 @@ impl ServerBuilder { } /// Adds HTTP headers to responses. - pub fn headers(mut self, headers: HeaderMap) -> Self { + pub(super) fn headers(mut self, headers: HeaderMap) -> Self { for (k, v) in headers.iter() { self.headers.insert(k.clone(), v.clone()); } @@ -192,13 +208,14 @@ impl ServerBuilder { } /// Builds and spawns an HTTP(S) Relay Server. - pub async fn spawn(self) -> Result { + pub(super) async fn spawn(self) -> Result { let server_task = ServerActorTask::spawn(); let service = RelayService::new( self.handlers, self.headers, server_task.server_channel.clone(), server_task.write_timeout, + self.client_rx_ratelimit, ); let addr = self.addr; @@ -280,6 +297,7 @@ struct Inner { headers: HeaderMap, server_channel: mpsc::Sender, write_timeout: Duration, + rate_limit: ClientConnRateLimit, } impl RelayService { @@ -497,6 +515,7 @@ impl Inner { stream: io, write_timeout: self.write_timeout, channel_capacity: PER_CLIENT_SEND_QUEUE_DEPTH, + rate_limit: self.rate_limit, server_channel: self.server_channel.clone(), }; trace!("accept: create client"); @@ -512,7 +531,7 @@ impl Inner { /// TLS Certificate Authority acceptor. #[derive(Clone, derive_more::Debug)] -pub enum TlsAcceptor { +pub(super) enum TlsAcceptor { /// Uses Let's Encrypt as the Certificate Authority. This is used in production. LetsEncrypt(#[debug("tokio_rustls_acme::AcmeAcceptor")] AcmeAcceptor), /// Manually added tls acceptor. Generally used for tests or for when we've passed in @@ -526,12 +545,14 @@ impl RelayService { headers: HeaderMap, server_channel: mpsc::Sender, write_timeout: Duration, + rate_limit: ClientConnRateLimit, ) -> Self { Self(Arc::new(Inner { handlers, headers, server_channel, write_timeout, + rate_limit, })) } @@ -893,6 +914,7 @@ mod tests { Default::default(), server_task.server_channel.clone(), server_task.write_timeout, + ClientConnRateLimit::MAX, ); // create client a and connect it to the server @@ -972,6 +994,7 @@ mod tests { Default::default(), server_task.server_channel.clone(), server_task.write_timeout, + ClientConnRateLimit::MAX, ); // create client a and connect it to the server From f2b5f1505b7455a53decf40ccbfd7d5decccc384 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 25 Nov 2024 13:00:09 +0100 Subject: [PATCH 03/18] Pin without macro --- iroh-relay/src/server/client_conn.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index d52a0959a9..234bf3ce8c 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -363,8 +363,7 @@ impl Stream for RateLimitedRelayedStream { loop { // If we have a delay installed, we need to await it. if let Some(ref mut wait_fut) = self.delay { - tokio::pin!(wait_fut); - match wait_fut.poll(cx) { + match Pin::new(wait_fut).poll(cx) { Poll::Ready(_) => { self.delay.take(); continue; From b83b2be8076afefcec3303ee1480e5e7a253af58 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 25 Nov 2024 16:30:56 +0100 Subject: [PATCH 04/18] Test the config file and transmutation --- iroh-relay/src/main.rs | 38 +++++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/iroh-relay/src/main.rs b/iroh-relay/src/main.rs index 86a9fe5d0b..fe6fdb9797 100644 --- a/iroh-relay/src/main.rs +++ b/iroh-relay/src/main.rs @@ -323,6 +323,10 @@ impl Config { } } + fn from_str(config: &str) -> Result { + toml::from_str(config).context("config must be valid toml") + } + async fn read_from_file(path: impl AsRef) -> Result { if !path.as_ref().is_file() { bail!("config-path must be a file"); @@ -330,9 +334,7 @@ impl Config { let config_ser = tokio::fs::read_to_string(&path) .await .context("unable to read config")?; - let config: Self = toml::from_str(&config_ser).context("config file must be valid toml")?; - - Ok(config) + Self::from_str(&config_ser) } } @@ -495,3 +497,33 @@ mod metrics { } } } + +#[cfg(test)] +mod tests { + use testresult::TestResult; + + use super::*; + + #[tokio::test] + async fn test_rate_limit_config() -> TestResult { + let config = " + [limits.client.rx] + bytes_per_second = 400 + max_burst_bytes = 800 + "; + let config = Config::from_str(config)?; + let relay_config = build_relay_config(config).await?; + + let relay = relay_config.relay.expect("no relay config"); + assert_eq!( + relay.limits.client_rx.bytes_per_second, + NonZeroU32::try_from(400).unwrap() + ); + assert_eq!( + relay.limits.client_rx.max_burst_bytes, + NonZeroU32::try_from(800).unwrap() + ); + + Ok(()) + } +} From f9aef81056c1be590a4e5dc86dfac58a407c94b4 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 25 Nov 2024 18:58:51 +0100 Subject: [PATCH 05/18] a more sane test. but it doesn't work :( --- iroh-relay/src/server/client_conn.rs | 84 ++++++++++++++-------------- 1 file changed, 41 insertions(+), 43 deletions(-) diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index 234bf3ce8c..0d8d19416a 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -703,62 +703,60 @@ mod tests { #[tokio::test] async fn test_rate_limit() -> TestResult { let _logging = iroh_test::logging::setup(); - let (_send_queue_s, send_queue_r) = mpsc::channel(10); - let (_disco_send_queue_s, disco_send_queue_r) = mpsc::channel(10); - let (_peer_gone_s, peer_gone_r) = mpsc::channel(10); - - let key = SecretKey::generate().public(); - let (io, io_rw) = tokio::io::duplex(1024); - let mut io_rw = Framed::new(io_rw, DerpCodec); - let (server_channel_s, mut server_channel_r) = mpsc::channel(10); - // We are only allowed to send 32 bytes per minute const LIMIT: u32 = 50; - let quota = governor::Quota::per_minute(NonZeroU32::try_from(LIMIT)?); - let limiter = governor::RateLimiter::direct(quota); - let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)); + const MAX_FRAMES: u32 = 100; - println!("-- create client conn"); - let actor = Actor { - stream: RateLimitedRelayedStream::new(stream, limiter), - timeout: Duration::from_secs(1), - send_queue: send_queue_r, - disco_send_queue: disco_send_queue_r, - node_gone: peer_gone_r, - key, - server_channel: server_channel_s, - preferred: true, - }; - - let done = CancellationToken::new(); - let io_done = done.clone(); + // Rate limiter allowing LIMIT bytes/s + let quota = governor::Quota::per_second(NonZeroU32::try_from(LIMIT)?); + let limiter = governor::RateLimiter::direct(quota); - info!("-- run client conn"); - let handle = tokio::task::spawn(async move { actor.run(io_done).await }); - let _handle = AbortOnDropHandle::new(handle); + // Build the rate limited stream. + let (io_read, io_write) = tokio::io::duplex((LIMIT * MAX_FRAMES) as _); + let mut frame_writer = Framed::new(io_write, DerpCodec); + let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io_read), DerpCodec)); + let mut stream = RateLimitedRelayedStream::new(stream, limiter); - // Prepare a packet to send. - let data = Bytes::from_static(b"hello world!"); + // Prepare a packet to send, assert its size. + let data = Bytes::from_static(b"hello world!!"); let target = SecretKey::generate().public(); - - // Assert the frame * 2 is over our limit. let frame = Frame::SendPacket { dst_key: target, packet: data.clone(), }; let frame_len = frame.len_with_header(); - assert!(frame_len * 2 > LIMIT as usize); - info!("-- send packet with {frame_len} bytes"); + assert_eq!(frame_len, LIMIT as usize); // Send a packet, it should arrive. - conn::send_packet(&mut io_rw, &None, target, data.clone()).await?; - let msg = server_channel_r.recv().await.context("actor died?")?; - assert!(matches!(msg, actor::Message::SendPacket { .. })); - - // Send another packet, it should not arrive - conn::send_packet(&mut io_rw, &None, target, data).await?; - let ret = tokio::time::timeout(Duration::from_secs(1), server_channel_r.recv()).await; - assert!(ret.is_err()); + info!("-- send packet"); + frame_writer.send(frame.clone()).await?; + frame_writer.flush().await?; + let recv_frame = tokio::time::timeout(Duration::from_millis(500), stream.next()) + .await + .expect("timeout") + .expect("option") + .expect("ok"); + assert_eq!(recv_frame, frame); + + // Next packet does not arrive. + info!("-- send packet"); + frame_writer.send(frame.clone()).await?; + frame_writer.flush().await?; + let res = tokio::time::timeout(Duration::from_millis(100), stream.next()).await; + assert!(res.is_err(), "expecting a timeout"); + info!("-- timeout happened"); + + // Wait long enough. + info!("-- sleep"); + tokio::time::sleep(Duration::from_secs(1)).await; + + // Packet arrives. + let recv_frame = tokio::time::timeout(Duration::from_millis(500), stream.next()) + .await + .expect("timeout") + .expect("option") + .expect("ok"); + assert_eq!(recv_frame, frame); Ok(()) } From 96320bb94c18347dbd9fee5bff72e5e5a252b5fa Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 26 Nov 2024 18:37:57 +0100 Subject: [PATCH 06/18] A bunch of (temp) tracing and fix the bug --- iroh-relay/src/server/client_conn.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index 0d8d19416a..0878babfeb 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -361,12 +361,20 @@ impl Stream for RateLimitedRelayedStream { cx: &mut std::task::Context<'_>, ) -> Poll> { loop { + trace!("loop tick"); // If we have a delay installed, we need to await it. if let Some(ref mut wait_fut) = self.delay { + trace!("polling delay future"); match Pin::new(wait_fut).poll(cx) { Poll::Ready(_) => { + trace!("delay future ready"); self.delay.take(); - continue; + // This future has already consumed the quota from the rate-limiter. + // So we must yield the buffered item right away. + match self.buf.take() { + Some(item) => return Poll::Ready(Some(item)), + None => continue, // unreachable + } } Poll::Pending => return Poll::Pending, } @@ -375,6 +383,7 @@ impl Stream for RateLimitedRelayedStream { if let Some(ref item) = self.buf { match item { Err(_) => { + trace!("error items are always yielded"); // Yielding errors is not rate-limited. match self.buf.take() { Some(item) => return Poll::Ready(Some(item)), @@ -393,11 +402,13 @@ impl Stream for RateLimitedRelayedStream { None => continue, // unreachable } }; + trace!("checking frame of size {frame_len}"); // Now check the rate limiter. match self.limiter.check_n(frame_len) { Ok(Ok(_)) => { // Item not rate-limited, yield it. + trace!("frame can be yielded"); match self.buf.take() { Some(frame) => return Poll::Ready(Some(frame)), None => continue, // unreachable @@ -405,6 +416,7 @@ impl Stream for RateLimitedRelayedStream { } Ok(Err(_until)) => { // Item is rate-limited, install a delay future. + trace!("frame is rate-limited, delay for {frame_len}"); let limiter = self.limiter.clone(); let fut = async move { limiter.until_n_ready(frame_len).await.ok(); @@ -425,8 +437,10 @@ impl Stream for RateLimitedRelayedStream { } } // If we have neither a delay future or a buffered item, poll for a new item. + trace!("polling inner"); match Pin::new(&mut self.inner).poll_next(cx) { Poll::Ready(Some(item)) => { + trace!("inner yielded item"); self.buf = Some(item); continue; } @@ -717,7 +731,7 @@ mod tests { let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io_read), DerpCodec)); let mut stream = RateLimitedRelayedStream::new(stream, limiter); - // Prepare a packet to send, assert its size. + // Prepare a frame to send, assert its size. let data = Bytes::from_static(b"hello world!!"); let target = SecretKey::generate().public(); let frame = Frame::SendPacket { @@ -727,7 +741,7 @@ mod tests { let frame_len = frame.len_with_header(); assert_eq!(frame_len, LIMIT as usize); - // Send a packet, it should arrive. + // Send a frame, it should arrive. info!("-- send packet"); frame_writer.send(frame.clone()).await?; frame_writer.flush().await?; @@ -738,7 +752,7 @@ mod tests { .expect("ok"); assert_eq!(recv_frame, frame); - // Next packet does not arrive. + // Next frame does not arrive. info!("-- send packet"); frame_writer.send(frame.clone()).await?; frame_writer.flush().await?; @@ -750,7 +764,7 @@ mod tests { info!("-- sleep"); tokio::time::sleep(Duration::from_secs(1)).await; - // Packet arrives. + // Frame arrives. let recv_frame = tokio::time::timeout(Duration::from_millis(500), stream.next()) .await .expect("timeout") From f979158999433e53a315e550f6d77f2f68f64f3b Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 26 Nov 2024 19:58:34 +0100 Subject: [PATCH 07/18] Store the state in fewer variables This makes misusing the state harder as the interactions are enforced more on the type-level. --- iroh-relay/src/server/client_conn.rs | 149 ++++++++++++--------------- 1 file changed, 66 insertions(+), 83 deletions(-) diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index 0878babfeb..1d28c64e35 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -4,6 +4,7 @@ use std::{future::Future, num::NonZeroU32, pin::Pin, sync::Arc, task::Poll, time use anyhow::{Context, Result}; use bytes::Bytes; +use futures_lite::FutureExt; use futures_sink::Sink; use futures_util::{SinkExt, Stream, StreamExt}; use iroh_base::key::NodeId; @@ -332,13 +333,21 @@ impl Actor { /// /// This potentially buffers one frame if the rate limiter does not allows this frame. /// While the frame is buffered the undernlying stream is no longer polled. -#[derive(derive_more::Debug)] +#[derive(Debug)] struct RateLimitedRelayedStream { inner: RelayedStream, limiter: Arc, - #[debug("Option>>>")] - delay: Option + Send + Sync>>>, - buf: Option>, + state: State, +} + +#[derive(derive_more::Debug)] +enum State { + #[debug("Blocked")] + Blocked { + delay: Pin + Send + Sync>>, + item: anyhow::Result, + }, + Ready, } impl RateLimitedRelayedStream { @@ -346,8 +355,7 @@ impl RateLimitedRelayedStream { Self { inner, limiter: Arc::new(limiter), - delay: None, - buf: None, + state: State::Ready, } } } @@ -361,92 +369,67 @@ impl Stream for RateLimitedRelayedStream { cx: &mut std::task::Context<'_>, ) -> Poll> { loop { - trace!("loop tick"); - // If we have a delay installed, we need to await it. - if let Some(ref mut wait_fut) = self.delay { - trace!("polling delay future"); - match Pin::new(wait_fut).poll(cx) { - Poll::Ready(_) => { - trace!("delay future ready"); - self.delay.take(); - // This future has already consumed the quota from the rate-limiter. - // So we must yield the buffered item right away. - match self.buf.take() { - Some(item) => return Poll::Ready(Some(item)), - None => continue, // unreachable + match &mut self.state { + State::Ready => { + // Poll inner for a new item. + match Pin::new(&mut self.inner).poll_next(cx) { + Poll::Ready(Some(item)) => { + match &item { + Ok(frame) => { + // How many bytes does this frame consume? + let Ok(frame_len) = + TryInto::::try_into(frame.len_with_header()) + .and_then(TryInto::::try_into) + else { + error!("frame len not NonZeroU32, is MAX_FRAME_SIZE too large?"); + // Let this frame through so to not completely break. + return Poll::Ready(Some(item)); + }; + + match self.limiter.check_n(frame_len) { + Ok(Ok(_)) => return Poll::Ready(Some(item)), + Ok(Err(_)) => { + // Item is rate-limited. + let limiter = self.limiter.clone(); + let delay = Box::pin(async move { + limiter.until_n_ready(frame_len).await.ok(); + }); + self.state = State::Blocked { delay, item }; + continue; + } + Err(_insufficient_capacity) => { + error!("frame larger than bucket capacity"); + // Let this frame through so to not completely break. + return Poll::Ready(Some(item)); + } + } + } + Err(_) => { + // Yielding errors is not rate-limited. + return Poll::Ready(Some(item)); + } + } } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, } - Poll::Pending => return Poll::Pending, } - } - // If we have an item buffered, check if we can yield it. - if let Some(ref item) = self.buf { - match item { - Err(_) => { - trace!("error items are always yielded"); - // Yielding errors is not rate-limited. - match self.buf.take() { - Some(item) => return Poll::Ready(Some(item)), - None => continue, // unreachable - } - } - Ok(frame) => { - // First we need to know how many bytes this frame consumes. - let Ok(frame_len) = TryInto::::try_into(frame.len_with_header()) - .and_then(TryInto::::try_into) - else { - error!("frame len not NonZeroU32, is MAX_FRAME_SIZE too large?"); - // Let this frame through anyway so to not completely break. - match self.buf.take() { - Some(item) => return Poll::Ready(Some(item)), - None => continue, // unreachable - } - }; - trace!("checking frame of size {frame_len}"); - - // Now check the rate limiter. - match self.limiter.check_n(frame_len) { - Ok(Ok(_)) => { - // Item not rate-limited, yield it. - trace!("frame can be yielded"); - match self.buf.take() { - Some(frame) => return Poll::Ready(Some(frame)), - None => continue, // unreachable - } - } - Ok(Err(_until)) => { - // Item is rate-limited, install a delay future. - trace!("frame is rate-limited, delay for {frame_len}"); - let limiter = self.limiter.clone(); - let fut = async move { - limiter.until_n_ready(frame_len).await.ok(); - }; - self.delay = Some(Box::pin(fut)); - continue; - } - Err(_insufficient_capacity) => { - error!("frame larger than bucket capacity, accepting frame"); - // Let this frame through since this is misconfigured. - match self.buf.take() { - Some(item) => return Poll::Ready(Some(item)), - None => continue, // unreachable + State::Blocked { delay, .. } => { + match delay.poll(cx) { + Poll::Ready(_) => { + match std::mem::replace(&mut self.state, State::Ready) { + State::Ready => continue, // unreachable + State::Blocked { item, .. } => { + // Yield the item directly, rate-limit has already been + // accounted for by awaiting the future. + return Poll::Ready(Some(item)); } } } + Poll::Pending => return Poll::Pending, } } } - // If we have neither a delay future or a buffered item, poll for a new item. - trace!("polling inner"); - match Pin::new(&mut self.inner).poll_next(cx) { - Poll::Ready(Some(item)) => { - trace!("inner yielded item"); - self.buf = Some(item); - continue; - } - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => return Poll::Pending, - } } } } From 39a53e485f4eca5e8c4b8f4ae0427fa2e1553294 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 27 Nov 2024 09:48:11 +0100 Subject: [PATCH 08/18] some docs --- iroh-relay/src/server/client_conn.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index 1d28c64e35..effe0f6cd5 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -344,7 +344,9 @@ struct RateLimitedRelayedStream { enum State { #[debug("Blocked")] Blocked { + /// Future which will complete when the item can be yielded. delay: Pin + Send + Sync>>, + /// Item to yield when the `delay` future completes. item: anyhow::Result, }, Ready, From 34d972134bfd35a014a7f51d4109226ba022d97e Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 27 Nov 2024 12:01:30 +0100 Subject: [PATCH 09/18] Set defaults to effectively unlimited --- iroh-relay/src/main.rs | 12 ++++++++++++ iroh-relay/src/server.rs | 9 +++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/iroh-relay/src/main.rs b/iroh-relay/src/main.rs index fe6fdb9797..0e9f33c923 100644 --- a/iroh-relay/src/main.rs +++ b/iroh-relay/src/main.rs @@ -526,4 +526,16 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_rate_limit_default() -> TestResult { + let config = Config::from_str("")?; + let relay_config = build_relay_config(config).await?; + + let relay = relay_config.relay.expect("no relay config"); + assert_eq!(relay.limits.client_rx.bytes_per_second, NonZeroU32::MAX); + assert_eq!(relay.limits.client_rx.max_burst_bytes, NonZeroU32::MAX); + + Ok(()) + } } diff --git a/iroh-relay/src/server.rs b/iroh-relay/src/server.rs index 8d58112be5..a511646f07 100644 --- a/iroh-relay/src/server.rs +++ b/iroh-relay/src/server.rs @@ -156,11 +156,11 @@ pub struct Limits { pub struct ClientConnRateLimit { /// Max number of bytes per second to read from the client connection. /// - /// Defaults to 4KiB/s. + /// Defaults to [`NonZeroU32::MAX`], effectively unlimited. pub bytes_per_second: NonZeroU32, /// Max number of bytes to read in a single burst. /// - /// Defaults to 16 MiB. + /// Defaults to [`NonZeroU32::MAX`], effectively unlimited. pub max_burst_bytes: NonZeroU32, } @@ -173,10 +173,7 @@ impl ClientConnRateLimit { impl Default for ClientConnRateLimit { fn default() -> Self { - Self { - bytes_per_second: NonZeroU32::try_from(1024 * 4).expect("nonzero"), - max_burst_bytes: NonZeroU32::try_from(1024 * 1024 * 16).expect("nonzero"), - } + Self::MAX } } From b1fe70854746991fb353c846b76339ae99db649b Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 27 Nov 2024 13:00:43 +0100 Subject: [PATCH 10/18] Add metrics --- iroh-relay/src/server/client_conn.rs | 16 ++++++++++++++++ iroh-relay/src/server/metrics.rs | 11 +++++++++++ 2 files changed, 27 insertions(+) diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index effe0f6cd5..2c68539178 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -338,6 +338,8 @@ struct RateLimitedRelayedStream { inner: RelayedStream, limiter: Arc, state: State, + /// Keeps track if this stream was ever rate-limited. + limited_once: bool, } #[derive(derive_more::Debug)] @@ -358,6 +360,19 @@ impl RateLimitedRelayedStream { inner, limiter: Arc::new(limiter), state: State::Ready, + limited_once: false, + } + } +} + +impl RateLimitedRelayedStream { + /// Records metrics about being rate-limited. + fn record_rate_limited(&mut self) { + // TODO: add a label for the frame type. + inc!(Metrics, frames_rx_ratelimited_total); + if !self.limited_once { + inc!(Metrics, conns_rx_ratelimited_total); + self.limited_once = true; } } } @@ -392,6 +407,7 @@ impl Stream for RateLimitedRelayedStream { Ok(Ok(_)) => return Poll::Ready(Some(item)), Ok(Err(_)) => { // Item is rate-limited. + self.record_rate_limited(); let limiter = self.limiter.clone(); let delay = Box::pin(async move { limiter.until_n_ready(frame_len).await.ok(); diff --git a/iroh-relay/src/server/metrics.rs b/iroh-relay/src/server/metrics.rs index 970294ec73..93e8247725 100644 --- a/iroh-relay/src/server/metrics.rs +++ b/iroh-relay/src/server/metrics.rs @@ -42,6 +42,11 @@ pub struct Metrics { /// Number of `FrameType::Unknown` received pub unknown_frames: Counter, + /// Number of frames received from client connection which have been rate-limited. + pub frames_rx_ratelimited_total: Counter, + /// Number of client connections which have had any frames rate-limited. + pub conns_rx_ratelimited_total: Counter, + /* * Metrics about peers */ @@ -91,6 +96,12 @@ impl Default for Metrics { got_ping: Counter::new("Number of times the server has received a Ping from a client."), sent_pong: Counter::new("Number of times the server has sent a Pong to a client."), unknown_frames: Counter::new("Number of unknown frames sent to this server."), + frames_rx_ratelimited_total: Counter::new( + "Number of frames received from client connection which have been rate-limited.", + ), + conns_rx_ratelimited_total: Counter::new( + "Number of client connections which have had any frames rate-limited.", + ), /* * Metrics about peers From 09b97d991b5d147beb8a102dd96790eed7cd393a Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 27 Nov 2024 14:21:59 +0100 Subject: [PATCH 11/18] Remove obsolete todo --- iroh-relay/src/server/client_conn.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index 2c68539178..66435a7bda 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -75,7 +75,6 @@ impl ClientConn { let quota = governor::Quota::per_second(rate_limit_config.bytes_per_second) .allow_burst(rate_limit_config.max_burst_bytes); - // TODO: Allow creating this with mocked time for tests? let rate_limiter = governor::RateLimiter::direct(quota); let stream = RateLimitedRelayedStream::new(io, rate_limiter); From b64c003b29cf5d171d200c9b2fe094a56d88db18 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 27 Nov 2024 14:27:10 +0100 Subject: [PATCH 12/18] make error message clearer --- iroh-relay/src/server/client_conn.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index 66435a7bda..c4208c6a43 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -415,7 +415,11 @@ impl Stream for RateLimitedRelayedStream { continue; } Err(_insufficient_capacity) => { - error!("frame larger than bucket capacity"); + error!( + "frame larger than bucket capacity: \ + configuration error: \ + max_burst_bytes < MAX_FRAME_SIZE?" + ); // Let this frame through so to not completely break. return Poll::Ready(Some(item)); } From f41b75b35cf6d8204ac6ecead21ecbaef010b814 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 27 Nov 2024 15:22:10 +0100 Subject: [PATCH 13/18] use unreachable!() --- iroh-relay/src/server/client_conn.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index c4208c6a43..5a8e8e9280 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -439,7 +439,7 @@ impl Stream for RateLimitedRelayedStream { match delay.poll(cx) { Poll::Ready(_) => { match std::mem::replace(&mut self.state, State::Ready) { - State::Ready => continue, // unreachable + State::Ready => unreachable!(), State::Blocked { item, .. } => { // Yield the item directly, rate-limit has already been // accounted for by awaiting the future. From fafe08330c81ce422a3bffd6ff006c8282f06ab1 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 27 Nov 2024 15:34:01 +0100 Subject: [PATCH 14/18] fix doc link --- iroh-relay/src/server/http_server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-relay/src/server/http_server.rs b/iroh-relay/src/server/http_server.rs index 2e613c07f7..04e3ce9f79 100644 --- a/iroh-relay/src/server/http_server.rs +++ b/iroh-relay/src/server/http_server.rs @@ -184,7 +184,7 @@ impl ServerBuilder { /// Sets the per-client rate-limit configuration for incoming data. /// /// On each client connection the incoming data is rate-limited. By default - /// [`RateLimitConfig::MAX`] is enforced. + /// [`ClientConnRateLimit::MAX`] is enforced. pub(super) fn client_rx_ratelimit(mut self, config: ClientConnRateLimit) -> Self { self.client_rx_ratelimit = config; self From 2e29600c48a03ec82f3db1bd46345e785e5be5d1 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 27 Nov 2024 15:35:41 +0100 Subject: [PATCH 15/18] This is only used in the server feature --- iroh-relay/src/protos/relay.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/iroh-relay/src/protos/relay.rs b/iroh-relay/src/protos/relay.rs index 6145b8d6c2..4997dc2102 100644 --- a/iroh-relay/src/protos/relay.rs +++ b/iroh-relay/src/protos/relay.rs @@ -288,6 +288,7 @@ impl Frame { } /// Serialized length with frame header. + #[cfg(feature = "server")] pub(crate) fn len_with_header(&self) -> usize { self.len() + HEADER_LEN } From e2f55dd03d65323ec667df843066dd0a5746a015 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 27 Nov 2024 18:12:09 +0100 Subject: [PATCH 16/18] Option spagetti to make it possible to disable the rate limiter Enjoy --- iroh-relay/src/main.rs | 39 +++++++++++++----------- iroh-relay/src/server.rs | 25 ++++------------ iroh-relay/src/server/actor.rs | 3 +- iroh-relay/src/server/client_conn.rs | 44 ++++++++++++++++------------ iroh-relay/src/server/clients.rs | 7 ++--- iroh-relay/src/server/http_server.rs | 16 +++++----- 6 files changed, 63 insertions(+), 71 deletions(-) diff --git a/iroh-relay/src/main.rs b/iroh-relay/src/main.rs index 0e9f33c923..94d81f095f 100644 --- a/iroh-relay/src/main.rs +++ b/iroh-relay/src/main.rs @@ -5,7 +5,6 @@ use std::{ net::{Ipv6Addr, SocketAddr}, - num::NonZeroU32, path::{Path, PathBuf}, }; @@ -411,20 +410,25 @@ async fn build_relay_config(cfg: Config) -> Result { let client_rx = match &limits.client { Some(PerClientRateLimitConfig { rx: Some(rx) }) => { - let mut cfg = ClientConnRateLimit::default(); - if let Some(bps) = rx.bytes_per_second { - let v = NonZeroU32::try_from(bps) - .context("bytes_per_second must be non-zero u32")?; - cfg.bytes_per_second = v; + if rx.bytes_per_second.is_none() && rx.max_burst_bytes.is_some() { + bail!("bytes_per_seconds must be specified to enable the rate-limiter"); } - if let Some(burst) = rx.max_burst_bytes { - let v = NonZeroU32::try_from(burst) - .context("max_burst_bytes must be non-zero u32")?; - cfg.max_burst_bytes = v; + match rx.bytes_per_second { + Some(bps) => Some(ClientConnRateLimit { + bytes_per_second: bps + .try_into() + .context("bytes_per_second must be non-zero u32")?, + max_burst_bytes: rx + .max_burst_bytes + .map(|v| { + v.try_into().context("max_burst_bytes must be non-zero u32") + }) + .transpose()?, + }), + None => None, } - cfg } - Some(PerClientRateLimitConfig { rx: None }) | None => Default::default(), + Some(PerClientRateLimitConfig { rx: None }) | None => None, }; relay::Limits { accept_conn_limit: limits.accept_conn_limit, @@ -500,6 +504,8 @@ mod metrics { #[cfg(test)] mod tests { + use std::num::NonZeroU32; + use testresult::TestResult; use super::*; @@ -516,12 +522,12 @@ mod tests { let relay = relay_config.relay.expect("no relay config"); assert_eq!( - relay.limits.client_rx.bytes_per_second, + relay.limits.client_rx.expect("ratelimit").bytes_per_second, NonZeroU32::try_from(400).unwrap() ); assert_eq!( - relay.limits.client_rx.max_burst_bytes, - NonZeroU32::try_from(800).unwrap() + relay.limits.client_rx.expect("ratelimit").max_burst_bytes, + Some(NonZeroU32::try_from(800).unwrap()) ); Ok(()) @@ -533,8 +539,7 @@ mod tests { let relay_config = build_relay_config(config).await?; let relay = relay_config.relay.expect("no relay config"); - assert_eq!(relay.limits.client_rx.bytes_per_second, NonZeroU32::MAX); - assert_eq!(relay.limits.client_rx.max_burst_bytes, NonZeroU32::MAX); + assert!(relay.limits.client_rx.is_none()); Ok(()) } diff --git a/iroh-relay/src/server.rs b/iroh-relay/src/server.rs index c9d9de4cca..80fac29e9e 100644 --- a/iroh-relay/src/server.rs +++ b/iroh-relay/src/server.rs @@ -148,33 +148,16 @@ pub struct Limits { /// Burst limit for accepting new connection. Unlimited if not set. pub accept_conn_burst: Option, /// Rate limits for incoming traffic from a client connection. - pub client_rx: ClientConnRateLimit, + pub client_rx: Option, } /// Per-client rate limit configuration. #[derive(Debug, Copy, Clone)] pub struct ClientConnRateLimit { /// Max number of bytes per second to read from the client connection. - /// - /// Defaults to [`NonZeroU32::MAX`], effectively unlimited. pub bytes_per_second: NonZeroU32, /// Max number of bytes to read in a single burst. - /// - /// Defaults to [`NonZeroU32::MAX`], effectively unlimited. - pub max_burst_bytes: NonZeroU32, -} - -impl ClientConnRateLimit { - pub(super) const MAX: ClientConnRateLimit = ClientConnRateLimit { - bytes_per_second: NonZeroU32::MAX, - max_burst_bytes: NonZeroU32::MAX, - }; -} - -impl Default for ClientConnRateLimit { - fn default() -> Self { - Self::MAX - } + pub max_burst_bytes: Option, } /// TLS certificate configuration. @@ -284,12 +267,14 @@ impl Server { None => relay_config.http_bind_addr, }; let mut builder = http_server::ServerBuilder::new(relay_bind_addr) - .client_rx_ratelimit(relay_config.limits.client_rx) .headers(headers) .request_handler(Method::GET, "/", Box::new(root_handler)) .request_handler(Method::GET, "/index.html", Box::new(root_handler)) .request_handler(Method::GET, RELAY_PROBE_PATH, Box::new(probe_handler)) .request_handler(Method::GET, "/robots.txt", Box::new(robots_handler)); + if let Some(cfg) = relay_config.limits.client_rx { + builder = builder.client_rx_ratelimit(cfg); + } let http_addr = match relay_config.tls { Some(tls_config) => { let server_config = rustls::ServerConfig::builder_with_provider(Arc::new( diff --git a/iroh-relay/src/server/actor.rs b/iroh-relay/src/server/actor.rs index 9d0f29867d..9ef38a0964 100644 --- a/iroh-relay/src/server/actor.rs +++ b/iroh-relay/src/server/actor.rs @@ -254,7 +254,6 @@ mod tests { server::{ client_conn::ClientConnConfig, streams::{MaybeTlsStream, RelayedStream}, - ClientConnRateLimit, }, }; @@ -269,7 +268,7 @@ mod tests { stream: RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)), write_timeout: Duration::from_secs(1), channel_capacity: 10, - rate_limit: ClientConnRateLimit::MAX, + rate_limit: None, server_channel, }, Framed::new(test_io, DerpCodec), diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index 5a8e8e9280..2b1122a3ad 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -33,7 +33,7 @@ pub(super) struct ClientConnConfig { pub(super) stream: RelayedStream, pub(super) write_timeout: Duration, pub(super) channel_capacity: usize, - pub(super) rate_limit: ClientConnRateLimit, + pub(super) rate_limit: Option, pub(super) server_channel: mpsc::Sender, } @@ -69,13 +69,17 @@ impl ClientConn { stream: io, write_timeout, channel_capacity, - rate_limit: rate_limit_config, + rate_limit, server_channel, } = config; - let quota = governor::Quota::per_second(rate_limit_config.bytes_per_second) - .allow_burst(rate_limit_config.max_burst_bytes); - let rate_limiter = governor::RateLimiter::direct(quota); + let rate_limiter = rate_limit.map(|cfg| { + let mut quota = governor::Quota::per_second(cfg.bytes_per_second); + if let Some(max_burst) = cfg.max_burst_bytes { + quota = quota.allow_burst(max_burst); + } + governor::RateLimiter::direct(quota) + }); let stream = RateLimitedRelayedStream::new(io, rate_limiter); let done = CancellationToken::new(); @@ -335,7 +339,7 @@ impl Actor { #[derive(Debug)] struct RateLimitedRelayedStream { inner: RelayedStream, - limiter: Arc, + limiter: Option>, state: State, /// Keeps track if this stream was ever rate-limited. limited_once: bool, @@ -354,10 +358,10 @@ enum State { } impl RateLimitedRelayedStream { - fn new(inner: RelayedStream, limiter: governor::DefaultDirectRateLimiter) -> Self { + fn new(inner: RelayedStream, limiter: Option) -> Self { Self { inner, - limiter: Arc::new(limiter), + limiter: limiter.map(Arc::new), state: State::Ready, limited_once: false, } @@ -384,6 +388,10 @@ impl Stream for RateLimitedRelayedStream { mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { + let Some(limiter) = self.limiter.clone() else { + // If there is no rate-limiter directly poll the inner. + return Pin::new(&mut self.inner).poll_next(cx); + }; loop { match &mut self.state { State::Ready => { @@ -402,14 +410,16 @@ impl Stream for RateLimitedRelayedStream { return Poll::Ready(Some(item)); }; - match self.limiter.check_n(frame_len) { + match limiter.check_n(frame_len) { Ok(Ok(_)) => return Poll::Ready(Some(item)), Ok(Err(_)) => { // Item is rate-limited. self.record_rate_limited(); - let limiter = self.limiter.clone(); - let delay = Box::pin(async move { - limiter.until_n_ready(frame_len).await.ok(); + let delay = Box::pin({ + let limiter = limiter.clone(); + async move { + limiter.until_n_ready(frame_len).await.ok(); + } }); self.state = State::Blocked { delay, item }; continue; @@ -509,12 +519,10 @@ mod tests { let (io, io_rw) = tokio::io::duplex(1024); let mut io_rw = Framed::new(io_rw, DerpCodec); let (server_channel_s, mut server_channel_r) = mpsc::channel(10); - let quota = governor::Quota::per_second(NonZeroU32::MAX); - let limiter = governor::RateLimiter::direct(quota); let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)); let actor = Actor { - stream: RateLimitedRelayedStream::new(stream, limiter), + stream: RateLimitedRelayedStream::new(stream, None), timeout: Duration::from_secs(1), send_queue: send_queue_r, disco_send_queue: disco_send_queue_r, @@ -651,13 +659,11 @@ mod tests { let (io, io_rw) = tokio::io::duplex(1024); let mut io_rw = Framed::new(io_rw, DerpCodec); let (server_channel_s, mut server_channel_r) = mpsc::channel(10); - let quota = governor::Quota::per_second(NonZeroU32::MAX); - let limiter = governor::RateLimiter::direct(quota); let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)); println!("-- create client conn"); let actor = Actor { - stream: RateLimitedRelayedStream::new(stream, limiter), + stream: RateLimitedRelayedStream::new(stream, None), timeout: Duration::from_secs(1), send_queue: send_queue_r, disco_send_queue: disco_send_queue_r, @@ -733,7 +739,7 @@ mod tests { let (io_read, io_write) = tokio::io::duplex((LIMIT * MAX_FRAMES) as _); let mut frame_writer = Framed::new(io_write, DerpCodec); let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io_read), DerpCodec)); - let mut stream = RateLimitedRelayedStream::new(stream, limiter); + let mut stream = RateLimitedRelayedStream::new(stream, Some(limiter)); // Prepare a frame to send, assert its size. let data = Bytes::from_static(b"hello world!!"); diff --git a/iroh-relay/src/server/clients.rs b/iroh-relay/src/server/clients.rs index 9b9f466da0..4c08fd3619 100644 --- a/iroh-relay/src/server/clients.rs +++ b/iroh-relay/src/server/clients.rs @@ -235,10 +235,7 @@ mod tests { use super::*; use crate::{ protos::relay::{recv_frame, DerpCodec, Frame, FrameType}, - server::{ - streams::{MaybeTlsStream, RelayedStream}, - ClientConnRateLimit, - }, + server::streams::{MaybeTlsStream, RelayedStream}, }; fn test_client_builder(key: NodeId) -> (ClientConnConfig, FramedRead) { @@ -250,7 +247,7 @@ mod tests { stream: RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)), write_timeout: Duration::from_secs(1), channel_capacity: 10, - rate_limit: ClientConnRateLimit::MAX, + rate_limit: None, server_channel, }, FramedRead::new(test_io, DerpCodec), diff --git a/iroh-relay/src/server/http_server.rs b/iroh-relay/src/server/http_server.rs index 04e3ce9f79..884b83a2f6 100644 --- a/iroh-relay/src/server/http_server.rs +++ b/iroh-relay/src/server/http_server.rs @@ -160,7 +160,7 @@ pub(super) struct ServerBuilder { /// /// Rate-limiting is enforced on received traffic from individual clients. This /// configuration applies to a single client connection. - client_rx_ratelimit: ClientConnRateLimit, + client_rx_ratelimit: Option, } impl ServerBuilder { @@ -171,7 +171,7 @@ impl ServerBuilder { tls_config: None, handlers: Default::default(), headers: HeaderMap::new(), - client_rx_ratelimit: ClientConnRateLimit::MAX, + client_rx_ratelimit: None, } } @@ -184,9 +184,9 @@ impl ServerBuilder { /// Sets the per-client rate-limit configuration for incoming data. /// /// On each client connection the incoming data is rate-limited. By default - /// [`ClientConnRateLimit::MAX`] is enforced. + /// no rate limit is enforced. pub(super) fn client_rx_ratelimit(mut self, config: ClientConnRateLimit) -> Self { - self.client_rx_ratelimit = config; + self.client_rx_ratelimit = Some(config); self } @@ -299,7 +299,7 @@ struct Inner { headers: HeaderMap, server_channel: mpsc::Sender, write_timeout: Duration, - rate_limit: ClientConnRateLimit, + rate_limit: Option, } impl RelayService { @@ -547,7 +547,7 @@ impl RelayService { headers: HeaderMap, server_channel: mpsc::Sender, write_timeout: Duration, - rate_limit: ClientConnRateLimit, + rate_limit: Option, ) -> Self { Self(Arc::new(Inner { handlers, @@ -916,7 +916,7 @@ mod tests { Default::default(), server_task.server_channel.clone(), server_task.write_timeout, - ClientConnRateLimit::MAX, + None, ); // create client a and connect it to the server @@ -996,7 +996,7 @@ mod tests { Default::default(), server_task.server_channel.clone(), server_task.write_timeout, - ClientConnRateLimit::MAX, + None, ); // create client a and connect it to the server From f5bf54d78b402966fd06c7bc3c0f9def432daf08 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 27 Nov 2024 18:19:19 +0100 Subject: [PATCH 17/18] Only clone when really needed --- iroh-relay/src/server/client_conn.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index 2b1122a3ad..14590880ff 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -388,10 +388,11 @@ impl Stream for RateLimitedRelayedStream { mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - let Some(limiter) = self.limiter.clone() else { + let Some(ref limiter) = self.limiter else { // If there is no rate-limiter directly poll the inner. return Pin::new(&mut self.inner).poll_next(cx); }; + let limiter = limiter.clone(); loop { match &mut self.state { State::Ready => { From 49ccd9812aa1b656078a6c36624f715f4968fda7 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 27 Nov 2024 18:23:53 +0100 Subject: [PATCH 18/18] Introduce RateLimitedRelayedStream::unlimited constructor That *is* a bit nicer to use. --- iroh-relay/src/server/client_conn.rs | 36 ++++++++++++++++++---------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index 14590880ff..afb4ed1efd 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -73,14 +73,17 @@ impl ClientConn { server_channel, } = config; - let rate_limiter = rate_limit.map(|cfg| { - let mut quota = governor::Quota::per_second(cfg.bytes_per_second); - if let Some(max_burst) = cfg.max_burst_bytes { - quota = quota.allow_burst(max_burst); + let stream = match rate_limit { + Some(cfg) => { + let mut quota = governor::Quota::per_second(cfg.bytes_per_second); + if let Some(max_burst) = cfg.max_burst_bytes { + quota = quota.allow_burst(max_burst); + } + let limiter = governor::RateLimiter::direct(quota); + RateLimitedRelayedStream::new(io, limiter) } - governor::RateLimiter::direct(quota) - }); - let stream = RateLimitedRelayedStream::new(io, rate_limiter); + None => RateLimitedRelayedStream::unlimited(io), + }; let done = CancellationToken::new(); let client_id = (key, conn_num); @@ -358,10 +361,19 @@ enum State { } impl RateLimitedRelayedStream { - fn new(inner: RelayedStream, limiter: Option) -> Self { + fn new(inner: RelayedStream, limiter: governor::DefaultDirectRateLimiter) -> Self { + Self { + inner, + limiter: Some(Arc::new(limiter)), + state: State::Ready, + limited_once: false, + } + } + + fn unlimited(inner: RelayedStream) -> Self { Self { inner, - limiter: limiter.map(Arc::new), + limiter: None, state: State::Ready, limited_once: false, } @@ -523,7 +535,7 @@ mod tests { let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)); let actor = Actor { - stream: RateLimitedRelayedStream::new(stream, None), + stream: RateLimitedRelayedStream::unlimited(stream), timeout: Duration::from_secs(1), send_queue: send_queue_r, disco_send_queue: disco_send_queue_r, @@ -664,7 +676,7 @@ mod tests { println!("-- create client conn"); let actor = Actor { - stream: RateLimitedRelayedStream::new(stream, None), + stream: RateLimitedRelayedStream::unlimited(stream), timeout: Duration::from_secs(1), send_queue: send_queue_r, disco_send_queue: disco_send_queue_r, @@ -740,7 +752,7 @@ mod tests { let (io_read, io_write) = tokio::io::duplex((LIMIT * MAX_FRAMES) as _); let mut frame_writer = Framed::new(io_write, DerpCodec); let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io_read), DerpCodec)); - let mut stream = RateLimitedRelayedStream::new(stream, Some(limiter)); + let mut stream = RateLimitedRelayedStream::new(stream, limiter); // Prepare a frame to send, assert its size. let data = Bytes::from_static(b"hello world!!");