From cc0d2e95631475601f215b58b90a8105519162cb Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Sat, 6 Apr 2024 18:40:49 -0700 Subject: [PATCH] Allocate Incoming response buffers as needed Threading buffers through alongside the `Incoming` does not reduce the number of allocations. --- quinn/src/endpoint.rs | 35 +++++++++++------------------- quinn/src/incoming.rs | 50 +++++++++++-------------------------------- 2 files changed, 26 insertions(+), 59 deletions(-) diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index b886f1b37..6d865d8a6 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -359,12 +359,9 @@ pub(crate) struct EndpointInner { } impl EndpointInner { - pub(crate) fn accept( - &self, - incoming: proto::Incoming, - mut response_buffer: BytesMut, - ) -> Result { + pub(crate) fn accept(&self, incoming: proto::Incoming) -> Result { let mut state = self.state.lock().unwrap(); + let mut response_buffer = BytesMut::new(); match state .inner .accept(incoming, Instant::now(), &mut response_buffer) @@ -383,25 +380,19 @@ impl EndpointInner { } } - pub(crate) fn refuse(&self, incoming: proto::Incoming, mut response_buffer: BytesMut) { + pub(crate) fn refuse(&self, incoming: proto::Incoming) { let mut state = self.state.lock().unwrap(); + let mut response_buffer = BytesMut::new(); let transmit = state.inner.refuse(incoming, &mut response_buffer); state.transmit_state.respond(transmit, response_buffer); } - pub(crate) fn retry( - &self, - incoming: proto::Incoming, - mut response_buffer: BytesMut, - ) -> Result<(), (proto::RetryError, BytesMut)> { + pub(crate) fn retry(&self, incoming: proto::Incoming) -> Result<(), proto::RetryError> { let mut state = self.state.lock().unwrap(); - match state.inner.retry(incoming, &mut response_buffer) { - Ok(transmit) => { - state.transmit_state.respond(transmit, response_buffer); - Ok(()) - } - Err(e) => Err((e, response_buffer)), - } + let mut response_buffer = BytesMut::new(); + let transmit = state.inner.retry(incoming, &mut response_buffer)?; + state.transmit_state.respond(transmit, response_buffer); + Ok(()) } } @@ -410,7 +401,7 @@ pub(crate) struct State { socket: Arc, inner: proto::Endpoint, transmit_state: TransmitState, - incoming: VecDeque<(proto::Incoming, BytesMut)>, + incoming: VecDeque, driver: Option, ipv6: bool, connections: ConnectionSet, @@ -464,7 +455,7 @@ impl State { ) { Some(DatagramEvent::NewConnection(incoming)) => { if self.incoming.len() < MAX_INCOMING_CONNECTIONS { - self.incoming.push_back((incoming, response_buffer)); + self.incoming.push_back(incoming); } else { let transmit = self.inner.refuse(incoming, &mut response_buffer); @@ -707,10 +698,10 @@ impl<'a> Future for Accept<'a> { if endpoint.driver_lost { return Poll::Ready(None); } - if let Some((incoming, response_buffer)) = endpoint.incoming.pop_front() { + if let Some(incoming) = endpoint.incoming.pop_front() { // Release the mutex lock on endpoint so cloning it doesn't deadlock drop(endpoint); - let incoming = Incoming::new(incoming, this.endpoint.inner.clone(), response_buffer); + let incoming = Incoming::new(incoming, this.endpoint.inner.clone()); return Poll::Ready(Some(incoming)); } if endpoint.connections.close.is_some() { diff --git a/quinn/src/incoming.rs b/quinn/src/incoming.rs index 0d3d02f12..a954b994e 100644 --- a/quinn/src/incoming.rs +++ b/quinn/src/incoming.rs @@ -1,12 +1,10 @@ use std::{ - fmt, future::{Future, IntoFuture}, net::{IpAddr, SocketAddr}, pin::Pin, task::{Context, Poll}, }; -use bytes::BytesMut; use proto::ConnectionError; use thiserror::Error; @@ -16,31 +14,24 @@ use crate::{ }; /// An incoming connection for which the server has not yet begun its part of the handshake +#[derive(Debug)] pub struct Incoming(Option); impl Incoming { - pub(crate) fn new( - inner: proto::Incoming, - endpoint: EndpointRef, - response_buffer: BytesMut, - ) -> Self { - Self(Some(State { - inner, - endpoint, - response_buffer, - })) + pub(crate) fn new(inner: proto::Incoming, endpoint: EndpointRef) -> Self { + Self(Some(State { inner, endpoint })) } /// Attempt to accept this incoming connection (an error may still occur) pub fn accept(mut self) -> Result { let state = self.0.take().unwrap(); - state.endpoint.accept(state.inner, state.response_buffer) + state.endpoint.accept(state.inner) } /// Reject this incoming connection attempt pub fn refuse(mut self) { let state = self.0.take().unwrap(); - state.endpoint.refuse(state.inner, state.response_buffer); + state.endpoint.refuse(state.inner); } /// Respond with a retry packet, requiring the client to retry with address validation @@ -48,16 +39,12 @@ impl Incoming { /// Errors if `remote_address_validated()` is true. pub fn retry(mut self) -> Result<(), RetryError> { let state = self.0.take().unwrap(); - state - .endpoint - .retry(state.inner, state.response_buffer) - .map_err(|(e, response_buffer)| { - RetryError(Self(Some(State { - inner: e.into_incoming(), - endpoint: state.endpoint, - response_buffer, - }))) - }) + state.endpoint.retry(state.inner).map_err(|e| { + RetryError(Self(Some(State { + inner: e.into_incoming(), + endpoint: state.endpoint, + }))) + }) } /// Ignore this incoming connection attempt, not sending any packet in response @@ -89,26 +76,15 @@ impl Drop for Incoming { fn drop(&mut self) { // Implicit reject, similar to Connection's implicit close if let Some(state) = self.0.take() { - state.endpoint.refuse(state.inner, state.response_buffer); + state.endpoint.refuse(state.inner); } } } -impl fmt::Debug for Incoming { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let state = self.0.as_ref().unwrap(); - f.debug_struct("Incoming") - .field("inner", &state.inner) - .field("endpoint", &state.endpoint) - // response_buffer is too big and not meaningful enough - .finish_non_exhaustive() - } -} - +#[derive(Debug)] struct State { inner: proto::Incoming, endpoint: EndpointRef, - response_buffer: BytesMut, } /// Error for attempting to retry an [`Incoming`] which already bears an address