From d61ae7d68d96de5401d153d14d511831c3acc2b4 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 27 Nov 2025 12:32:50 +0100 Subject: [PATCH 01/14] refactor: improve error storage for closed connections --- quinn-proto/src/connection/mod.rs | 129 +++++++++++++++++++++--------- quinn-proto/src/tests/mod.rs | 44 +++++++--- quinn/src/tests.rs | 4 +- 3 files changed, 130 insertions(+), 47 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 3d7e91c70..99f6f7325 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -205,8 +205,6 @@ pub struct Connection { timers: TimerTable, /// Number of packets received which could not be authenticated authentication_failures: u64, - /// Why the connection was lost, if it has been - error: Option, // // Queued non-retransmittable 1-RTT data @@ -409,7 +407,6 @@ impl Connection { }, timers: TimerTable::default(), authentication_failures: 0, - error: None, close: false, ack_frequency: AckFrequencyState::new(get_max_ack_delay( @@ -486,8 +483,9 @@ impl Connection { return Some(Event::Stream(event)); } - if let Some(err) = self.error.take() { - return Some(Event::ConnectionLost { reason: err }); + dbg!(&self.state); + if let Some(reason) = self.state.take_error() { + return Some(Event::ConnectionLost { reason }); } None @@ -881,11 +879,11 @@ impl Connection { // Check whether we need to send a close message let close = match self.state { - State::Drained => { + State::Drained { .. } => { self.app_limited = true; return None; } - State::Draining | State::Closed(_) => { + State::Draining { .. } | State::Closed { .. } => { // self.close is only reset once the associated packet had been // encoded successfully if !self.close { @@ -1210,7 +1208,10 @@ impl Connection { if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() { let max_frame_size = builder.frame_space_remaining(); match self.state { - State::Closed(state::Closed { ref reason }) => { + State::Closed { + reason: state::Closed { ref reason }, + .. + } => { if space_id == SpaceId::Data || reason.is_transport_layer() { reason.encode(&mut builder.frame_space_mut(), max_frame_size) } else { @@ -1222,7 +1223,7 @@ impl Connection { .encode(&mut builder.frame_space_mut(), max_frame_size) } } - State::Draining => frame::ConnectionClose { + State::Draining { .. } => frame::ConnectionClose { error_code: TransportErrorCode::NO_ERROR, frame_type: None, reason: Bytes::new(), @@ -1781,7 +1782,22 @@ impl Connection { match timer { Timer::Conn(timer) => match timer { ConnTimer::Close => { - self.state = State::Drained; + // TODO: what is the right error? + let error = self + .state + .take_error() + .map(|e| match e { + ConnectionError::ConnectionClosed(close) => { + if close.error_code == TransportErrorCode::PROTOCOL_VIOLATION { + ConnectionError::TransportError(close.error_code.into()) + } else { + ConnectionError::ConnectionClosed(close) + } + } + e => e, + }) + .unwrap_or(ConnectionError::LocallyClosed); + self.state = State::Drained { error: Some(error) }; self.endpoint_events.push_back(EndpointEventInner::Drained); } ConnTimer::Idle => { @@ -1952,7 +1968,10 @@ impl Connection { self.close_common(); self.set_close_timer(now); self.close = true; - self.state = State::Closed(state::Closed { reason }); + self.state = State::Closed { + error_read: false, + reason: state::Closed { reason }, + }; } } @@ -3508,7 +3527,6 @@ impl Connection { // State transitions for error cases if let Err(conn_err) = result { - self.error = Some(conn_err.clone()); self.state = match conn_err { ConnectionError::ApplicationClosed(reason) => State::closed(reason), ConnectionError::ConnectionClosed(reason) => State::closed(reason), @@ -3516,7 +3534,9 @@ impl Connection { | ConnectionError::TransportError(TransportError { code: TransportErrorCode::AEAD_LIMIT_REACHED, .. - }) => State::Drained, + }) => State::Drained { + error: Some(conn_err.clone()), + }, ConnectionError::TimedOut => { unreachable!("timeouts aren't generated by packet processing"); } @@ -3524,7 +3544,9 @@ impl Connection { debug!("closing connection due to transport error: {}", err); State::closed(err) } - ConnectionError::VersionMismatch => State::Draining, + ConnectionError::VersionMismatch => State::Draining { + error: Some(conn_err.clone()), + }, ConnectionError::LocallyClosed => { unreachable!("LocallyClosed isn't generated by packet processing"); } @@ -3548,7 +3570,7 @@ impl Connection { } // Transmit CONNECTION_CLOSE if necessary - if let State::Closed(_) = self.state { + if let State::Closed { .. } = self.state { // If there is no PathData for this PathId the packet was for a brand new // path. It was a valid packet however, so the remote is valid and we want to // send CONNECTION_CLOSE. @@ -3591,7 +3613,7 @@ impl Connection { } return Ok(()); } - State::Closed(_) => { + State::Closed { .. } => { for result in frame::Iter::new(packet.payload.freeze())? { let frame = match result { Ok(frame) => frame, @@ -3609,13 +3631,15 @@ impl Connection { if let Frame::Close(_) = frame { trace!("draining"); - self.state = State::Draining; + self.state = State::Draining { + error: Some(error.into()), + }; break; } } return Ok(()); } - State::Draining | State::Drained => return Ok(()), + State::Draining { .. } | State::Drained { .. } => return Ok(()), State::Handshake(ref mut state) => state, }; @@ -3912,12 +3936,12 @@ impl Connection { self.on_path_ack_received(now, packet.header.space(), ack)?; } Frame::Close(reason) => { - self.error = Some(reason.into()); - self.state = State::Draining; + self.state = State::Draining { + error: Some(reason.into()), + }; return Ok(()); } _ => { - dbg!(&frame); let mut err = TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake"); err.frame = Some(frame.ty()); @@ -4606,8 +4630,9 @@ impl Connection { self.streams.queue_max_stream_id(pending); if let Some(reason) = close { - self.error = Some(reason.into()); - self.state = State::Draining; + self.state = State::Draining { + error: Some(reason.into()), + }; self.close = true; } @@ -5806,8 +5831,9 @@ impl Connection { /// Terminate the connection instantly, without sending a close packet fn kill(&mut self, reason: ConnectionError) { self.close_common(); - self.error = Some(reason); - self.state = State::Drained; + self.state = State::Drained { + error: Some(reason), + }; self.endpoint_events.push_back(EndpointEventInner::Drained); } @@ -6274,21 +6300,33 @@ pub struct MultipathNotNegotiated { } #[allow(unreachable_pub)] // fuzzing only -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum State { Handshake(state::Handshake), Established, - Closed(state::Closed), - Draining, + Closed { + reason: state::Closed, + error_read: bool, + }, + Draining { + /// Why the connection was lost, if it has been + error: Option, + }, /// Waiting for application to call close so we can dispose of the resources - Drained, + Drained { + /// Why the connection was lost, if it has been + error: Option, + }, } impl State { fn closed>(reason: R) -> Self { - Self::Closed(state::Closed { - reason: reason.into(), - }) + Self::Closed { + reason: state::Closed { + reason: reason.into(), + }, + error_read: false, + } } fn is_handshake(&self) -> bool { @@ -6300,11 +6338,30 @@ impl State { } fn is_closed(&self) -> bool { - matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained) + matches!( + *self, + Self::Closed { .. } | Self::Draining { .. } | Self::Drained { .. } + ) } fn is_drained(&self) -> bool { - matches!(*self, Self::Drained) + matches!(*self, Self::Drained { .. }) + } + + fn take_error(&mut self) -> Option { + match self { + Self::Draining { error } => error.take(), + Self::Drained { error } => error.take(), + Self::Closed { reason, error_read } => { + if *error_read { + None + } else { + *error_read = true; + Some(reason.clone().reason.into()) + } + } + Self::Handshake(_) | Self::Established => None, + } } } @@ -6312,7 +6369,7 @@ mod state { use super::*; #[allow(unreachable_pub)] // fuzzing only - #[derive(Clone)] + #[derive(Debug, Clone)] pub struct Handshake { /// Whether the remote CID has been set by the peer yet /// @@ -6344,7 +6401,7 @@ mod state { } #[allow(unreachable_pub)] // fuzzing only - #[derive(Clone)] + #[derive(Debug, Clone)] pub struct Closed { pub(super) reason: Close, } diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index c9fe23c9a..07bdae734 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -145,7 +145,11 @@ fn lifecycle() { Some(Event::ConnectionLost { reason: ConnectionError::ApplicationClosed( ApplicationClose { error_code: VarInt(42), ref reason } )}) if reason == REASON); - assert_matches!(pair.client_conn_mut(client_ch).poll(), None); + assert_matches!(pair.client_conn_mut(client_ch).poll(), + Some(Event::ConnectionLost { + reason: ConnectionError::ConnectionClosed(close) + }) if close.error_code == TransportErrorCode::NO_ERROR + ); assert_eq!(pair.client.known_connections(), 0); assert_eq!(pair.client.known_cids(), 0); assert_eq!(pair.server.known_connections(), 0); @@ -178,7 +182,11 @@ fn draft_version_compat() { Some(Event::ConnectionLost { reason: ConnectionError::ApplicationClosed( ApplicationClose { error_code: VarInt(42), ref reason } )}) if reason == REASON); - assert_matches!(pair.client_conn_mut(client_ch).poll(), None); + assert_matches!(pair.client_conn_mut(client_ch).poll(), + Some(Event::ConnectionLost { + reason: ConnectionError::ConnectionClosed(close) + }) if close.error_code == TransportErrorCode::NO_ERROR + ); assert_eq!(pair.client.known_connections(), 0); assert_eq!(pair.client.known_cids(), 0); assert_eq!(pair.server.known_connections(), 0); @@ -438,9 +446,13 @@ fn reject_self_signed_server_cert() { pair.drive(); - assert_matches!(pair.client_conn_mut(client_ch).poll(), - Some(Event::ConnectionLost { reason: ConnectionError::TransportError(ref error)}) - if error.code == TransportErrorCode::crypto(AlertDescription::UnknownCA.into())); + let a = pair.client_conn_mut(client_ch).poll(); + dbg!(&a); + assert_matches!( + a, + Some(Event::ConnectionLost { reason: ConnectionError::TransportError(ref error)}) + if error.code == TransportErrorCode::crypto(AlertDescription::UnknownCA.into()) + ); } #[test] @@ -884,8 +896,10 @@ fn alpn_mismatch() { ])))); pair.drive(); + let res = pair.client_conn_mut(client_ch).poll(); + dbg!(&res); assert_matches!( - pair.client_conn_mut(client_ch).poll(), + res, Some(Event::ConnectionLost { reason: ConnectionError::ConnectionClosed(err) }) if err.error_code == TransportErrorCode::crypto(0x78) ); } @@ -1133,14 +1147,19 @@ fn instant_close_1() { let mut pair = Pair::default(); info!("connecting"); let client_ch = pair.begin_connect(client_config()); + let reason = Bytes::new(); pair.client .connections .get_mut(&client_ch) .unwrap() - .close(pair.time, VarInt(0), Bytes::new()); + .close(pair.time, VarInt(0), reason); pair.drive(); let server_ch = pair.server.assert_accept(); - assert_matches!(pair.client_conn_mut(client_ch).poll(), None); + assert_matches!(pair.client_conn_mut(client_ch).poll(), + Some(Event::ConnectionLost { + reason: ConnectionError::ApplicationClosed(ApplicationClose { error_code: VarInt(0), ref reason }) + }) if reason == reason + ); assert_matches!( pair.server_conn_mut(server_ch).poll(), Some(Event::ConnectionLost { @@ -1160,13 +1179,18 @@ fn instant_close_2() { let client_ch = pair.begin_connect(client_config()); // Unlike `instant_close`, the server sees a valid Initial packet first. pair.drive_client(); + let reason = Bytes::new(); pair.client .connections .get_mut(&client_ch) .unwrap() - .close(pair.time, VarInt(42), Bytes::new()); + .close(pair.time, VarInt(42), reason); pair.drive(); - assert_matches!(pair.client_conn_mut(client_ch).poll(), None); + assert_matches!(pair.client_conn_mut(client_ch).poll(), + Some(Event::ConnectionLost { + reason: ConnectionError::ApplicationClosed(ApplicationClose { error_code: VarInt(42), ref reason }) + }) if reason == reason + ); let server_ch = pair.server.assert_accept(); assert_matches!( pair.server_conn_mut(server_ch).poll(), diff --git a/quinn/src/tests.rs b/quinn/src/tests.rs index f286cf3cd..8509cfddd 100755 --- a/quinn/src/tests.rs +++ b/quinn/src/tests.rs @@ -105,7 +105,9 @@ async fn close_endpoint() { .unwrap(); endpoint.close(0u32.into(), &[]); match conn.await { - Err(crate::ConnectionError::LocallyClosed) => (), + Err( + crate::ConnectionError::LocallyClosed | crate::ConnectionError::ApplicationClosed(_), + ) => (), Err(e) => panic!("unexpected error: {e}"), Ok(_) => { panic!("unexpected success"); From ca6eb1003ffb64430eb7d32e9a2c025944076270 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 27 Nov 2025 13:03:49 +0100 Subject: [PATCH 02/14] update tests --- quinn-proto/src/tests/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index 07bdae734..632f2a11d 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -450,8 +450,8 @@ fn reject_self_signed_server_cert() { dbg!(&a); assert_matches!( a, - Some(Event::ConnectionLost { reason: ConnectionError::TransportError(ref error)}) - if error.code == TransportErrorCode::crypto(AlertDescription::UnknownCA.into()) + Some(Event::ConnectionLost { reason: ConnectionError::ConnectionClosed(ref reason)}) + if reason.error_code == TransportErrorCode::crypto(AlertDescription::UnknownCA.into()) ); } @@ -509,8 +509,8 @@ fn reject_missing_client_cert() { Some(Event::HandshakeDataReady) ); assert_matches!(pair.server_conn_mut(server_ch).poll(), - Some(Event::ConnectionLost { reason: ConnectionError::TransportError(ref error)}) - if error.code == TransportErrorCode::crypto(AlertDescription::CertificateRequired.into())); + Some(Event::ConnectionLost { reason: ConnectionError::ConnectionClosed(ref close)}) + if close.error_code == TransportErrorCode::crypto(AlertDescription::CertificateRequired.into())); } #[test] From f6aa9bf2bda0a5f631fd83ddae32116da7d9ae78 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 27 Nov 2025 13:29:31 +0100 Subject: [PATCH 03/14] fallback to locally closed --- quinn-proto/src/connection/mod.rs | 8 +++----- quinn-proto/src/tests/mod.rs | 18 ++++++++++-------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 99f6f7325..ffdb1cc16 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1782,7 +1782,6 @@ impl Connection { match timer { Timer::Conn(timer) => match timer { ConnTimer::Close => { - // TODO: what is the right error? let error = self .state .take_error() @@ -1796,6 +1795,7 @@ impl Connection { } e => e, }) + // If we haven't received a close error we fallback to LocallyClosed .unwrap_or(ConnectionError::LocallyClosed); self.state = State::Drained { error: Some(error) }; self.endpoint_events.push_back(EndpointEventInner::Drained); @@ -3629,11 +3629,9 @@ impl Connection { self.stats.frame_rx.record(&frame); - if let Frame::Close(_) = frame { + if let Frame::Close(_error) = frame { trace!("draining"); - self.state = State::Draining { - error: Some(error.into()), - }; + self.state = State::Draining { error: None }; break; } } diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index 632f2a11d..3af05f7fe 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -145,10 +145,11 @@ fn lifecycle() { Some(Event::ConnectionLost { reason: ConnectionError::ApplicationClosed( ApplicationClose { error_code: VarInt(42), ref reason } )}) if reason == REASON); - assert_matches!(pair.client_conn_mut(client_ch).poll(), - Some(Event::ConnectionLost { - reason: ConnectionError::ConnectionClosed(close) - }) if close.error_code == TransportErrorCode::NO_ERROR + assert_matches!( + pair.client_conn_mut(client_ch).poll(), + Some(Event::ConnectionLost { + reason: ConnectionError::LocallyClosed + }) ); assert_eq!(pair.client.known_connections(), 0); assert_eq!(pair.client.known_cids(), 0); @@ -182,10 +183,11 @@ fn draft_version_compat() { Some(Event::ConnectionLost { reason: ConnectionError::ApplicationClosed( ApplicationClose { error_code: VarInt(42), ref reason } )}) if reason == REASON); - assert_matches!(pair.client_conn_mut(client_ch).poll(), - Some(Event::ConnectionLost { - reason: ConnectionError::ConnectionClosed(close) - }) if close.error_code == TransportErrorCode::NO_ERROR + assert_matches!( + pair.client_conn_mut(client_ch).poll(), + Some(Event::ConnectionLost { + reason: ConnectionError::LocallyClosed + }) ); assert_eq!(pair.client.known_connections(), 0); assert_eq!(pair.client.known_cids(), 0); From 5c0f27189983e1f7b0e7a0893ab311a3f316f2d3 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 27 Nov 2025 14:50:08 +0100 Subject: [PATCH 04/14] more consistent tracking of local close --- quinn-proto/src/connection/mod.rs | 414 +++++++++++++------- quinn-proto/src/connection/streams/state.rs | 16 +- quinn-proto/src/tests/mod.rs | 14 +- 3 files changed, 298 insertions(+), 146 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index ffdb1cc16..cad04968c 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -340,7 +340,7 @@ impl Connection { }; #[cfg(not(test))] let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng); - let state = State::Handshake(state::Handshake { + let state = State::handshake(state::Handshake { rem_cid_set: side.is_server(), expected_token: Bytes::new(), client_hello: None, @@ -483,7 +483,6 @@ impl Connection { return Some(Event::Stream(event)); } - dbg!(&self.state); if let Some(reason) = self.state.take_error() { return Some(Event::ConnectionLost { reason }); } @@ -878,12 +877,12 @@ impl Connection { // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths // Check whether we need to send a close message - let close = match self.state { - State::Drained { .. } => { + let close = match self.state.as_typ() { + StateTyp::Drained => { self.app_limited = true; return None; } - State::Draining { .. } | State::Closed { .. } => { + StateTyp::Draining | StateTyp::Closed => { // self.close is only reset once the associated packet had been // encoded successfully if !self.close { @@ -1207,11 +1206,9 @@ impl Connection { ); if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() { let max_frame_size = builder.frame_space_remaining(); - match self.state { - State::Closed { - reason: state::Closed { ref reason }, - .. - } => { + match self.state.as_typ() { + StateTyp::Closed => { + let reason = &self.state.as_closed().expect("checked").reason; if space_id == SpaceId::Data || reason.is_transport_layer() { reason.encode(&mut builder.frame_space_mut(), max_frame_size) } else { @@ -1223,7 +1220,7 @@ impl Connection { .encode(&mut builder.frame_space_mut(), max_frame_size) } } - State::Draining { .. } => frame::ConnectionClose { + StateTyp::Draining => frame::ConnectionClose { error_code: TransportErrorCode::NO_ERROR, frame_type: None, reason: Bytes::new(), @@ -1782,22 +1779,7 @@ impl Connection { match timer { Timer::Conn(timer) => match timer { ConnTimer::Close => { - let error = self - .state - .take_error() - .map(|e| match e { - ConnectionError::ConnectionClosed(close) => { - if close.error_code == TransportErrorCode::PROTOCOL_VIOLATION { - ConnectionError::TransportError(close.error_code.into()) - } else { - ConnectionError::ConnectionClosed(close) - } - } - e => e, - }) - // If we haven't received a close error we fallback to LocallyClosed - .unwrap_or(ConnectionError::LocallyClosed); - self.state = State::Drained { error: Some(error) }; + self.state.to_drained(None); self.endpoint_events.push_back(EndpointEventInner::Drained); } ConnTimer::Idle => { @@ -1968,10 +1950,7 @@ impl Connection { self.close_common(); self.set_close_timer(now); self.close = true; - self.state = State::Closed { - error_read: false, - reason: state::Closed { reason }, - }; + self.state.to_closed_locally(reason); } } @@ -3010,7 +2989,7 @@ impl Connection { // the real server. From now on we should no longer allow the server to migrate // its address. if space_id == SpaceId::Handshake { - if let State::Handshake(ref mut hs) = self.state { + if let Some(hs) = self.state.as_handshake_mut() { hs.allow_server_migration = false; } } @@ -3126,11 +3105,10 @@ impl Connection { let path_id = PathId::ZERO; self.path_data_mut(path_id).total_recvd = len as u64; - match self.state { - State::Handshake(ref mut state) => { - state.expected_token = packet.header.token.clone(); - } - _ => unreachable!("first packet must be delivered in Handshake state"), + if let Some(hs) = self.state.as_handshake_mut() { + hs.expected_token = packet.header.token.clone(); + } else { + unreachable!("first packet must be delivered in Handshake state"); } // The first packet is always on PathId::ZERO @@ -3270,9 +3248,9 @@ impl Connection { } let offset = self.spaces[space].crypto_offset; let outgoing = Bytes::from(outgoing); - if let State::Handshake(ref mut state) = self.state { + if let Some(hs) = self.state.as_handshake_mut() { if space == SpaceId::Initial && offset == 0 && self.side.is_client() { - state.client_hello = Some(outgoing.clone()); + hs.client_hello = Some(outgoing.clone()); } } self.spaces[space].crypto_offset += outgoing.len() as u64; @@ -3420,15 +3398,17 @@ impl Connection { return; } if remote != self.path_data_mut(path_id).remote { - match self.state { - State::Handshake(ref hs) if hs.allow_server_migration => { + if let Some(hs) = self.state.as_handshake() { + if hs.allow_server_migration { trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote"); self.path_data_mut(path_id).remote = remote; - } - _ => { + } else { debug!("discarding packet with unexpected remote during handshake"); return; } + } else { + debug!("discarding packet with unexpected remote during handshake"); + return; } } } @@ -3486,7 +3466,7 @@ impl Connection { return; } else { if let Header::Initial(InitialHeader { ref token, .. }) = packet.header { - if let State::Handshake(ref hs) = self.state { + if let Some(hs) = self.state.as_handshake() { if self.side.is_server() && token != &hs.expected_token { // Clients must send the same retry token in every Initial. Initial // packets can be spoofed, so we discard rather than killing the @@ -3527,26 +3507,26 @@ impl Connection { // State transitions for error cases if let Err(conn_err) = result { - self.state = match conn_err { - ConnectionError::ApplicationClosed(reason) => State::closed(reason), - ConnectionError::ConnectionClosed(reason) => State::closed(reason), + match conn_err { + ConnectionError::ApplicationClosed(reason) => self.state.to_closed(reason), + ConnectionError::ConnectionClosed(reason) => self.state.to_closed(reason), ConnectionError::Reset | ConnectionError::TransportError(TransportError { code: TransportErrorCode::AEAD_LIMIT_REACHED, .. - }) => State::Drained { - error: Some(conn_err.clone()), - }, + }) => { + self.state.to_drained(Some(conn_err)); + } ConnectionError::TimedOut => { unreachable!("timeouts aren't generated by packet processing"); } ConnectionError::TransportError(err) => { debug!("closing connection due to transport error: {}", err); - State::closed(err) + self.state.to_closed(err); + } + ConnectionError::VersionMismatch => { + self.state.to_draining(conn_err); } - ConnectionError::VersionMismatch => State::Draining { - error: Some(conn_err.clone()), - }, ConnectionError::LocallyClosed => { unreachable!("LocallyClosed isn't generated by packet processing"); } @@ -3570,7 +3550,7 @@ impl Connection { } // Transmit CONNECTION_CLOSE if necessary - if let State::Closed { .. } = self.state { + if matches!(self.state.as_typ(), StateTyp::Closed) { // If there is no PathData for this PathId the packet was for a brand new // path. It was a valid packet however, so the remote is valid and we want to // send CONNECTION_CLOSE. @@ -3598,8 +3578,8 @@ impl Connection { trace!(%path_id, ?number, "discarding packet for unknown path"); return Ok(()); } - let state = match self.state { - State::Established => { + let state = match self.state.as_typ() { + StateTyp::Established => { match packet.header.space() { SpaceId::Data => { self.process_payload(now, remote, path_id, number.unwrap(), packet)? @@ -3613,7 +3593,7 @@ impl Connection { } return Ok(()); } - State::Closed { .. } => { + StateTyp::Closed => { for result in frame::Iter::new(packet.payload.freeze())? { let frame = match result { Ok(frame) => frame, @@ -3631,14 +3611,14 @@ impl Connection { if let Frame::Close(_error) = frame { trace!("draining"); - self.state = State::Draining { error: None }; + self.state.to_draining_clean(); break; } } return Ok(()); } - State::Draining { .. } | State::Drained { .. } => return Ok(()), - State::Handshake(ref mut state) => state, + StateTyp::Draining | StateTyp::Drained => return Ok(()), + StateTyp::Handshake => self.state.as_handshake_mut().expect("checked"), }; match packet.header { @@ -3728,7 +3708,7 @@ impl Connection { }; *token = packet.payload.freeze().split_to(token_len); - self.state = State::Handshake(state::Handshake { + self.state = State::handshake(state::Handshake { expected_token: Bytes::new(), rem_cid_set: false, client_hello: None, @@ -3812,7 +3792,7 @@ impl Connection { } self.events.push_back(Event::Connected); - self.state = State::Established; + self.state.to_established(); trace!("established"); // Multipath can only be enabled after the state has reached Established. @@ -3836,7 +3816,7 @@ impl Connection { self.rem_handshake_cid = rem_cid; self.orig_rem_cid = rem_cid; state.rem_cid_set = true; - self.state = State::Handshake(state); + self.state.to_handshake(state); } else if rem_cid != self.rem_handshake_cid { debug!( "discarding packet with mismatched remote CID: {} != {}", @@ -3934,9 +3914,7 @@ impl Connection { self.on_path_ack_received(now, packet.header.space(), ack)?; } Frame::Close(reason) => { - self.state = State::Draining { - error: Some(reason.into()), - }; + self.state.to_draining(reason.into()); return Ok(()); } _ => { @@ -4628,9 +4606,7 @@ impl Connection { self.streams.queue_max_stream_id(pending); if let Some(reason) = close { - self.state = State::Draining { - error: Some(reason.into()), - }; + self.state.to_draining(reason.into()); self.close = true; } @@ -5829,9 +5805,7 @@ impl Connection { /// Terminate the connection instantly, without sending a close packet fn kill(&mut self, reason: ConnectionError) { self.close_common(); - self.state = State::Drained { - error: Some(reason), - }; + self.state.to_drained(Some(reason)); self.endpoint_events.push_back(EndpointEventInner::Drained); } @@ -6117,10 +6091,13 @@ impl ConnectionSide { fn remote_may_migrate(&self, state: &State) -> bool { match self { Self::Server { server_config } => server_config.migration, - Self::Client { .. } => match state { - State::Handshake(handshake) => handshake.allow_server_migration, - _ => false, - }, + Self::Client { .. } => { + if let Some(hs) = state.as_handshake() { + hs.allow_server_migration + } else { + false + } + } } } @@ -6297,74 +6274,247 @@ pub struct MultipathNotNegotiated { _private: (), } -#[allow(unreachable_pub)] // fuzzing only -#[derive(Debug, Clone)] -pub enum State { - Handshake(state::Handshake), - Established, - Closed { - reason: state::Closed, - error_read: bool, - }, - Draining { - /// Why the connection was lost, if it has been - error: Option, - }, - /// Waiting for application to call close so we can dispose of the resources - Drained { - /// Why the connection was lost, if it has been - error: Option, - }, -} +use state::{State, StateTyp}; -impl State { - fn closed>(reason: R) -> Self { - Self::Closed { - reason: state::Closed { - reason: reason.into(), - }, - error_read: false, - } - } +mod state { + use super::*; - fn is_handshake(&self) -> bool { - matches!(*self, Self::Handshake(_)) + #[allow(unreachable_pub)] // fuzzing only + #[derive(Debug, Clone)] + pub struct State { + inner: InnerState, } - fn is_established(&self) -> bool { - matches!(*self, Self::Established) - } + impl State { + pub(super) fn as_handshake_mut(&mut self) -> Option<&mut state::Handshake> { + if let InnerState::Handshake(ref mut hs) = self.inner { + Some(hs) + } else { + None + } + } - fn is_closed(&self) -> bool { - matches!( - *self, - Self::Closed { .. } | Self::Draining { .. } | Self::Drained { .. } - ) + pub(super) fn as_handshake(&self) -> Option<&state::Handshake> { + if let InnerState::Handshake(ref hs) = self.inner { + Some(hs) + } else { + None + } + } + + pub(super) fn as_closed(&self) -> Option<&Closed> { + if let InnerState::Closed { + ref remote_reason, .. + } = self.inner + { + Some(remote_reason) + } else { + None + } + } + + #[allow(unreachable_pub)] // fuzzing only + #[cfg(test)] + pub fn established() -> Self { + Self { + inner: InnerState::Established, + } + } + + pub(super) fn handshake(handshake: state::Handshake) -> Self { + Self { + inner: InnerState::Handshake(handshake), + } + } + + pub(super) fn to_handshake(&mut self, hs: state::Handshake) { + self.inner = InnerState::Handshake(hs); + } + + pub(super) fn to_established(&mut self) { + self.inner = InnerState::Established; + } + + pub(super) fn to_drained(&mut self, error: Option) { + let error = if let Some(error) = error { + Some(error) + } else { + match &mut self.inner { + InnerState::Draining { error } => error.take(), + InnerState::Drained { error } => error.take(), + InnerState::Closed { + remote_reason, + local_reason, + error_read, + } => { + if *error_read { + None + } else { + *error_read = true; + if *local_reason { + Some(ConnectionError::LocallyClosed) + } else { + let error = match remote_reason.clone().reason.into() { + ConnectionError::ConnectionClosed(close) => { + if close.error_code + == TransportErrorCode::PROTOCOL_VIOLATION + { + ConnectionError::TransportError(close.error_code.into()) + } else { + ConnectionError::ConnectionClosed(close) + } + } + e => e, + }; + Some(error) + } + } + } + InnerState::Handshake(_) | InnerState::Established => None, + } + }; + self.inner = InnerState::Drained { error }; + } + + pub(super) fn to_draining(&mut self, error: ConnectionError) { + self.inner = InnerState::Draining { error: Some(error) }; + } + + pub(super) fn to_draining_clean(&mut self) { + let mut error = None; + if let InnerState::Closed { local_reason, .. } = &self.inner { + if *local_reason { + error.replace(ConnectionError::LocallyClosed); + } + } + self.inner = InnerState::Draining { error }; + } + + pub(super) fn to_closed>(&mut self, reason: R) { + self.inner = InnerState::Closed { + error_read: false, + remote_reason: state::Closed { + reason: reason.into(), + }, + local_reason: false, + }; + } + + pub(super) fn to_closed_locally>(&mut self, reason: R) { + self.inner = InnerState::Closed { + error_read: false, + remote_reason: state::Closed { + reason: reason.into(), + }, + local_reason: true, + }; + } + + pub(super) fn is_handshake(&self) -> bool { + self.inner.is_handshake() + } + + pub(super) fn is_established(&self) -> bool { + self.inner.is_established() + } + + pub(super) fn is_closed(&self) -> bool { + self.inner.is_closed() + } + + pub(super) fn is_drained(&self) -> bool { + self.inner.is_drained() + } + + pub(super) fn take_error(&mut self) -> Option { + self.inner.take_error() + } + + pub(super) fn as_typ(&self) -> StateTyp { + match self.inner { + InnerState::Handshake(_) => StateTyp::Handshake, + InnerState::Established => StateTyp::Established, + InnerState::Closed { .. } => StateTyp::Closed, + InnerState::Draining { .. } => StateTyp::Draining, + InnerState::Drained { .. } => StateTyp::Drained, + } + } } - fn is_drained(&self) -> bool { - matches!(*self, Self::Drained { .. }) + #[allow(unreachable_pub)] // fuzzing only + #[derive(Debug, Clone)] + pub enum StateTyp { + Handshake, + Established, + Closed, + Draining, + Drained, } - fn take_error(&mut self) -> Option { - match self { - Self::Draining { error } => error.take(), - Self::Drained { error } => error.take(), - Self::Closed { reason, error_read } => { - if *error_read { - None - } else { - *error_read = true; - Some(reason.clone().reason.into()) + #[derive(Debug, Clone)] + enum InnerState { + Handshake(state::Handshake), + Established, + Closed { + remote_reason: state::Closed, + local_reason: bool, + error_read: bool, + }, + Draining { + /// Why the connection was lost, if it has been + error: Option, + }, + /// Waiting for application to call close so we can dispose of the resources + Drained { + /// Why the connection was lost, if it has been + error: Option, + }, + } + + impl InnerState { + fn is_handshake(&self) -> bool { + matches!(*self, Self::Handshake(_)) + } + + fn is_established(&self) -> bool { + matches!(*self, Self::Established) + } + + fn is_closed(&self) -> bool { + matches!( + *self, + Self::Closed { .. } | Self::Draining { .. } | Self::Drained { .. } + ) + } + + fn is_drained(&self) -> bool { + matches!(*self, Self::Drained { .. }) + } + + fn take_error(&mut self) -> Option { + match self { + Self::Draining { error } => error.take(), + Self::Drained { error } => error.take(), + Self::Closed { + remote_reason, + local_reason, + error_read, + } => { + if *error_read { + None + } else { + *error_read = true; + if *local_reason { + Some(ConnectionError::LocallyClosed) + } else { + Some(remote_reason.clone().reason.into()) + } + } } + Self::Handshake(_) | Self::Established => None, } - Self::Handshake(_) | Self::Established => None, } } -} - -mod state { - use super::*; #[allow(unreachable_pub)] // fuzzing only #[derive(Debug, Clone)] diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index 7837b0707..c87bcc09b 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -1280,7 +1280,7 @@ mod tests { ..TransportParameters::default() }); - let (mut pending, state) = (Retransmits::default(), ConnState::Established); + let (mut pending, state) = (Retransmits::default(), ConnState::established()); let id = Streams { state: &mut server, conn_state: &state, @@ -1341,7 +1341,7 @@ mod tests { ..TransportParameters::default() }); - let (mut pending, state) = (Retransmits::default(), ConnState::Established); + let (mut pending, state) = (Retransmits::default(), ConnState::established()); let mut streams = Streams { state: &mut server, conn_state: &state, @@ -1397,7 +1397,7 @@ mod tests { ..TransportParameters::default() }); - let (mut pending, state) = (Retransmits::default(), ConnState::Established); + let (mut pending, state) = (Retransmits::default(), ConnState::established()); let mut streams = Streams { state: &mut server, conn_state: &state, @@ -1467,7 +1467,7 @@ mod tests { ..TransportParameters::default() }); - let (mut pending, state) = (Retransmits::default(), ConnState::Established); + let (mut pending, state) = (Retransmits::default(), ConnState::established()); let mut streams = Streams { state: &mut server, conn_state: &state, @@ -1548,7 +1548,7 @@ mod tests { ..TransportParameters::default() }); - let (mut pending, state) = (Retransmits::default(), ConnState::Established); + let (mut pending, state) = (Retransmits::default(), ConnState::established()); let mut streams = Streams { state: &mut server, conn_state: &state, @@ -1654,7 +1654,7 @@ mod tests { initial_max_stream_data_uni: 42u32.into(), ..TransportParameters::default() }); - let (mut pending, state) = (Retransmits::default(), ConnState::Established); + let (mut pending, state) = (Retransmits::default(), ConnState::established()); let mut streams = Streams { state: &mut server, conn_state: &state, @@ -1957,7 +1957,7 @@ mod tests { assert_eq!(server.poll(), None); let mut retransmits = Retransmits::default(); - let conn_state = ConnState::Established; + let conn_state = ConnState::established(); let stream_id = Streams { state: &mut server, @@ -2032,7 +2032,7 @@ mod tests { assert_eq!(server.poll(), None); let mut retransmits = Retransmits::default(); - let conn_state = ConnState::Established; + let conn_state = ConnState::established(); let stream_id = Streams { state: &mut server, diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index 3af05f7fe..c5e16594a 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -1157,10 +1157,11 @@ fn instant_close_1() { .close(pair.time, VarInt(0), reason); pair.drive(); let server_ch = pair.server.assert_accept(); - assert_matches!(pair.client_conn_mut(client_ch).poll(), + assert_matches!( + pair.client_conn_mut(client_ch).poll(), Some(Event::ConnectionLost { - reason: ConnectionError::ApplicationClosed(ApplicationClose { error_code: VarInt(0), ref reason }) - }) if reason == reason + reason: ConnectionError::LocallyClosed + }) ); assert_matches!( pair.server_conn_mut(server_ch).poll(), @@ -1188,10 +1189,11 @@ fn instant_close_2() { .unwrap() .close(pair.time, VarInt(42), reason); pair.drive(); - assert_matches!(pair.client_conn_mut(client_ch).poll(), + assert_matches!( + pair.client_conn_mut(client_ch).poll(), Some(Event::ConnectionLost { - reason: ConnectionError::ApplicationClosed(ApplicationClose { error_code: VarInt(42), ref reason }) - }) if reason == reason + reason: ConnectionError::LocallyClosed + }) ); let server_ch = pair.server.assert_accept(); assert_matches!( From 50ec3afd88d4256fd1cdf4e1b2d5b2d68942fdbc Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 27 Nov 2025 14:55:47 +0100 Subject: [PATCH 05/14] move state to its own file --- quinn-proto/src/connection/mod.rs | 284 +--------------------------- quinn-proto/src/connection/state.rs | 255 +++++++++++++++++++++++++ 2 files changed, 258 insertions(+), 281 deletions(-) create mode 100644 quinn-proto/src/connection/state.rs diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index cad04968c..4c8206825 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -98,6 +98,9 @@ use timer::{Timer, TimerTable}; mod transmit_buf; use transmit_buf::TransmitBuf; +mod state; +use state::{State, StateTyp}; + /// Protocol state and logic for a single QUIC connection /// /// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application @@ -6274,287 +6277,6 @@ pub struct MultipathNotNegotiated { _private: (), } -use state::{State, StateTyp}; - -mod state { - use super::*; - - #[allow(unreachable_pub)] // fuzzing only - #[derive(Debug, Clone)] - pub struct State { - inner: InnerState, - } - - impl State { - pub(super) fn as_handshake_mut(&mut self) -> Option<&mut state::Handshake> { - if let InnerState::Handshake(ref mut hs) = self.inner { - Some(hs) - } else { - None - } - } - - pub(super) fn as_handshake(&self) -> Option<&state::Handshake> { - if let InnerState::Handshake(ref hs) = self.inner { - Some(hs) - } else { - None - } - } - - pub(super) fn as_closed(&self) -> Option<&Closed> { - if let InnerState::Closed { - ref remote_reason, .. - } = self.inner - { - Some(remote_reason) - } else { - None - } - } - - #[allow(unreachable_pub)] // fuzzing only - #[cfg(test)] - pub fn established() -> Self { - Self { - inner: InnerState::Established, - } - } - - pub(super) fn handshake(handshake: state::Handshake) -> Self { - Self { - inner: InnerState::Handshake(handshake), - } - } - - pub(super) fn to_handshake(&mut self, hs: state::Handshake) { - self.inner = InnerState::Handshake(hs); - } - - pub(super) fn to_established(&mut self) { - self.inner = InnerState::Established; - } - - pub(super) fn to_drained(&mut self, error: Option) { - let error = if let Some(error) = error { - Some(error) - } else { - match &mut self.inner { - InnerState::Draining { error } => error.take(), - InnerState::Drained { error } => error.take(), - InnerState::Closed { - remote_reason, - local_reason, - error_read, - } => { - if *error_read { - None - } else { - *error_read = true; - if *local_reason { - Some(ConnectionError::LocallyClosed) - } else { - let error = match remote_reason.clone().reason.into() { - ConnectionError::ConnectionClosed(close) => { - if close.error_code - == TransportErrorCode::PROTOCOL_VIOLATION - { - ConnectionError::TransportError(close.error_code.into()) - } else { - ConnectionError::ConnectionClosed(close) - } - } - e => e, - }; - Some(error) - } - } - } - InnerState::Handshake(_) | InnerState::Established => None, - } - }; - self.inner = InnerState::Drained { error }; - } - - pub(super) fn to_draining(&mut self, error: ConnectionError) { - self.inner = InnerState::Draining { error: Some(error) }; - } - - pub(super) fn to_draining_clean(&mut self) { - let mut error = None; - if let InnerState::Closed { local_reason, .. } = &self.inner { - if *local_reason { - error.replace(ConnectionError::LocallyClosed); - } - } - self.inner = InnerState::Draining { error }; - } - - pub(super) fn to_closed>(&mut self, reason: R) { - self.inner = InnerState::Closed { - error_read: false, - remote_reason: state::Closed { - reason: reason.into(), - }, - local_reason: false, - }; - } - - pub(super) fn to_closed_locally>(&mut self, reason: R) { - self.inner = InnerState::Closed { - error_read: false, - remote_reason: state::Closed { - reason: reason.into(), - }, - local_reason: true, - }; - } - - pub(super) fn is_handshake(&self) -> bool { - self.inner.is_handshake() - } - - pub(super) fn is_established(&self) -> bool { - self.inner.is_established() - } - - pub(super) fn is_closed(&self) -> bool { - self.inner.is_closed() - } - - pub(super) fn is_drained(&self) -> bool { - self.inner.is_drained() - } - - pub(super) fn take_error(&mut self) -> Option { - self.inner.take_error() - } - - pub(super) fn as_typ(&self) -> StateTyp { - match self.inner { - InnerState::Handshake(_) => StateTyp::Handshake, - InnerState::Established => StateTyp::Established, - InnerState::Closed { .. } => StateTyp::Closed, - InnerState::Draining { .. } => StateTyp::Draining, - InnerState::Drained { .. } => StateTyp::Drained, - } - } - } - - #[allow(unreachable_pub)] // fuzzing only - #[derive(Debug, Clone)] - pub enum StateTyp { - Handshake, - Established, - Closed, - Draining, - Drained, - } - - #[derive(Debug, Clone)] - enum InnerState { - Handshake(state::Handshake), - Established, - Closed { - remote_reason: state::Closed, - local_reason: bool, - error_read: bool, - }, - Draining { - /// Why the connection was lost, if it has been - error: Option, - }, - /// Waiting for application to call close so we can dispose of the resources - Drained { - /// Why the connection was lost, if it has been - error: Option, - }, - } - - impl InnerState { - fn is_handshake(&self) -> bool { - matches!(*self, Self::Handshake(_)) - } - - fn is_established(&self) -> bool { - matches!(*self, Self::Established) - } - - fn is_closed(&self) -> bool { - matches!( - *self, - Self::Closed { .. } | Self::Draining { .. } | Self::Drained { .. } - ) - } - - fn is_drained(&self) -> bool { - matches!(*self, Self::Drained { .. }) - } - - fn take_error(&mut self) -> Option { - match self { - Self::Draining { error } => error.take(), - Self::Drained { error } => error.take(), - Self::Closed { - remote_reason, - local_reason, - error_read, - } => { - if *error_read { - None - } else { - *error_read = true; - if *local_reason { - Some(ConnectionError::LocallyClosed) - } else { - Some(remote_reason.clone().reason.into()) - } - } - } - Self::Handshake(_) | Self::Established => None, - } - } - } - - #[allow(unreachable_pub)] // fuzzing only - #[derive(Debug, Clone)] - pub struct Handshake { - /// Whether the remote CID has been set by the peer yet - /// - /// Always set for servers - pub(super) rem_cid_set: bool, - /// Stateless retry token received in the first Initial by a server. - /// - /// Must be present in every Initial. Always empty for clients. - pub(super) expected_token: Bytes, - /// First cryptographic message - /// - /// Only set for clients - pub(super) client_hello: Option, - /// Whether the server address is allowed to migrate - /// - /// We allow the server to migrate during the handshake as long as we have not - /// received an authenticated handshake packet: it can send a response from a - /// different address than we sent the initial to. This allows us to send the - /// initial packet over multiple paths - by means of an IPv6 ULA address that copies - /// the packets sent to it to multiple destinations - and accept one response. - /// - /// This is only ever set to true if for a client which hasn't yet received an - /// authenticated handshake packet. It is set back to false in - /// [`Connection::on_packet_authenticated`]. - /// - /// THIS IS NOT RFC 9000 COMPLIANT! A server is not allowed to migrate addresses, - /// other than using the preferred-address transport parameter. - pub(super) allow_server_migration: bool, - } - - #[allow(unreachable_pub)] // fuzzing only - #[derive(Debug, Clone)] - pub struct Closed { - pub(super) reason: Close, - } -} - /// Events of interest to the application #[derive(Debug)] pub enum Event { diff --git a/quinn-proto/src/connection/state.rs b/quinn-proto/src/connection/state.rs new file mode 100644 index 000000000..1549400c2 --- /dev/null +++ b/quinn-proto/src/connection/state.rs @@ -0,0 +1,255 @@ +use bytes::Bytes; + +use crate::frame::Close; +use crate::{ConnectionError, TransportErrorCode}; + +#[allow(unreachable_pub)] // fuzzing only +#[derive(Debug, Clone)] +pub struct State { + inner: InnerState, +} + +impl State { + pub(super) fn as_handshake_mut(&mut self) -> Option<&mut Handshake> { + if let InnerState::Handshake(ref mut hs) = self.inner { + Some(hs) + } else { + None + } + } + + pub(super) fn as_handshake(&self) -> Option<&Handshake> { + if let InnerState::Handshake(ref hs) = self.inner { + Some(hs) + } else { + None + } + } + + pub(super) fn as_closed(&self) -> Option<&Closed> { + if let InnerState::Closed { + ref remote_reason, .. + } = self.inner + { + Some(remote_reason) + } else { + None + } + } + + #[allow(unreachable_pub)] // fuzzing only + #[cfg(test)] + pub fn established() -> Self { + Self { + inner: InnerState::Established, + } + } + + pub(super) fn handshake(handshake: Handshake) -> Self { + Self { + inner: InnerState::Handshake(handshake), + } + } + + pub(super) fn to_handshake(&mut self, hs: Handshake) { + self.inner = InnerState::Handshake(hs); + } + + pub(super) fn to_established(&mut self) { + self.inner = InnerState::Established; + } + + pub(super) fn to_drained(&mut self, error: Option) { + let error = if let Some(error) = error { + Some(error) + } else { + match &mut self.inner { + InnerState::Draining { error } => error.take(), + InnerState::Drained { error } => error.take(), + InnerState::Closed { + remote_reason, + local_reason, + error_read, + } => { + if *error_read { + None + } else { + *error_read = true; + if *local_reason { + Some(ConnectionError::LocallyClosed) + } else { + let error = match remote_reason.clone().reason.into() { + ConnectionError::ConnectionClosed(close) => { + if close.error_code == TransportErrorCode::PROTOCOL_VIOLATION { + ConnectionError::TransportError(close.error_code.into()) + } else { + ConnectionError::ConnectionClosed(close) + } + } + e => e, + }; + Some(error) + } + } + } + InnerState::Handshake(_) | InnerState::Established => None, + } + }; + self.inner = InnerState::Drained { error }; + } + + pub(super) fn to_draining(&mut self, error: ConnectionError) { + self.inner = InnerState::Draining { error: Some(error) }; + } + + pub(super) fn to_draining_clean(&mut self) { + let mut error = None; + if let InnerState::Closed { local_reason, .. } = &self.inner { + if *local_reason { + error.replace(ConnectionError::LocallyClosed); + } + } + self.inner = InnerState::Draining { error }; + } + + pub(super) fn to_closed>(&mut self, reason: R) { + self.inner = InnerState::Closed { + error_read: false, + remote_reason: Closed { + reason: reason.into(), + }, + local_reason: false, + }; + } + + pub(super) fn to_closed_locally>(&mut self, reason: R) { + self.inner = InnerState::Closed { + error_read: false, + remote_reason: Closed { + reason: reason.into(), + }, + local_reason: true, + }; + } + + pub(super) fn is_handshake(&self) -> bool { + matches!(self.inner, InnerState::Handshake(_)) + } + + pub(super) fn is_established(&self) -> bool { + matches!(self.inner, InnerState::Established) + } + + pub(super) fn is_closed(&self) -> bool { + matches!( + self.inner, + InnerState::Closed { .. } | InnerState::Draining { .. } | InnerState::Drained { .. } + ) + } + + pub(super) fn is_drained(&self) -> bool { + matches!(self.inner, InnerState::Drained { .. }) + } + + pub(super) fn take_error(&mut self) -> Option { + match &mut self.inner { + InnerState::Draining { error } => error.take(), + InnerState::Drained { error } => error.take(), + InnerState::Closed { + remote_reason, + local_reason, + error_read, + } => { + if *error_read { + None + } else { + *error_read = true; + if *local_reason { + Some(ConnectionError::LocallyClosed) + } else { + Some(remote_reason.clone().reason.into()) + } + } + } + InnerState::Handshake(_) | InnerState::Established => None, + } + } + + pub(super) fn as_typ(&self) -> StateTyp { + match self.inner { + InnerState::Handshake(_) => StateTyp::Handshake, + InnerState::Established => StateTyp::Established, + InnerState::Closed { .. } => StateTyp::Closed, + InnerState::Draining { .. } => StateTyp::Draining, + InnerState::Drained { .. } => StateTyp::Drained, + } + } +} + +#[allow(unreachable_pub)] // fuzzing only +#[derive(Debug, Clone)] +pub enum StateTyp { + Handshake, + Established, + Closed, + Draining, + Drained, +} + +#[derive(Debug, Clone)] +enum InnerState { + Handshake(Handshake), + Established, + Closed { + remote_reason: Closed, + local_reason: bool, + error_read: bool, + }, + Draining { + /// Why the connection was lost, if it has been + error: Option, + }, + /// Waiting for application to call close so we can dispose of the resources + Drained { + /// Why the connection was lost, if it has been + error: Option, + }, +} + +#[allow(unreachable_pub)] // fuzzing only +#[derive(Debug, Clone)] +pub struct Handshake { + /// Whether the remote CID has been set by the peer yet + /// + /// Always set for servers + pub(super) rem_cid_set: bool, + /// Stateless retry token received in the first Initial by a server. + /// + /// Must be present in every Initial. Always empty for clients. + pub(super) expected_token: Bytes, + /// First cryptographic message + /// + /// Only set for clients + pub(super) client_hello: Option, + /// Whether the server address is allowed to migrate + /// + /// We allow the server to migrate during the handshake as long as we have not + /// received an authenticated handshake packet: it can send a response from a + /// different address than we sent the initial to. This allows us to send the + /// initial packet over multiple paths - by means of an IPv6 ULA address that copies + /// the packets sent to it to multiple destinations - and accept one response. + /// + /// This is only ever set to true if for a client which hasn't yet received an + /// authenticated handshake packet. It is set back to false in + /// [`Connection::on_packet_authenticated`]. + /// + /// THIS IS NOT RFC 9000 COMPLIANT! A server is not allowed to migrate addresses, + /// other than using the preferred-address transport parameter. + pub(super) allow_server_migration: bool, +} + +#[allow(unreachable_pub)] // fuzzing only +#[derive(Debug, Clone)] +pub struct Closed { + pub(super) reason: Close, +} From 231dfff00dcff8bc5d299db23ac904b3c2736e2a Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 27 Nov 2025 14:58:25 +0100 Subject: [PATCH 06/14] add some comments --- quinn-proto/src/connection/state.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/quinn-proto/src/connection/state.rs b/quinn-proto/src/connection/state.rs index 1549400c2..190b8a213 100644 --- a/quinn-proto/src/connection/state.rs +++ b/quinn-proto/src/connection/state.rs @@ -200,9 +200,13 @@ pub enum StateTyp { enum InnerState { Handshake(Handshake), Established, + // TODO: should this be split into `ClosedLocal` and `ClosedRemote`? Closed { + /// The reason the remote closed the connection, or the reason we are sending to the remote. remote_reason: Closed, + /// Set to true if we closed the connection locally local_reason: bool, + /// Did we read this as error already? error_read: bool, }, Draining { From 8d55cde675292444d53fb86fc83e5b6399c6a4e7 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 28 Nov 2025 10:51:37 +0100 Subject: [PATCH 07/14] undo changes to tests and create failing test --- quinn-proto/src/connection/mod.rs | 9 +- quinn-proto/src/connection/state.rs | 132 ++++++++++++++++++++-------- quinn-proto/src/tests/mod.rs | 32 ++----- 3 files changed, 104 insertions(+), 69 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 4c8206825..5b06f4aef 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -486,6 +486,7 @@ impl Connection { return Some(Event::Stream(event)); } + dbg!(&self.state); if let Some(reason) = self.state.take_error() { return Some(Event::ConnectionLost { reason }); } @@ -3528,7 +3529,7 @@ impl Connection { self.state.to_closed(err); } ConnectionError::VersionMismatch => { - self.state.to_draining(conn_err); + self.state.to_draining(Some(conn_err)); } ConnectionError::LocallyClosed => { unreachable!("LocallyClosed isn't generated by packet processing"); @@ -3614,7 +3615,7 @@ impl Connection { if let Frame::Close(_error) = frame { trace!("draining"); - self.state.to_draining_clean(); + self.state.to_draining(None); break; } } @@ -3917,7 +3918,7 @@ impl Connection { self.on_path_ack_received(now, packet.header.space(), ack)?; } Frame::Close(reason) => { - self.state.to_draining(reason.into()); + self.state.to_draining(Some(reason.into())); return Ok(()); } _ => { @@ -4609,7 +4610,7 @@ impl Connection { self.streams.queue_max_stream_id(pending); if let Some(reason) = close { - self.state.to_draining(reason.into()); + self.state.to_draining(Some(reason.into())); self.close = true; } diff --git a/quinn-proto/src/connection/state.rs b/quinn-proto/src/connection/state.rs index 190b8a213..0ade02ec4 100644 --- a/quinn-proto/src/connection/state.rs +++ b/quinn-proto/src/connection/state.rs @@ -60,75 +60,99 @@ impl State { } pub(super) fn to_drained(&mut self, error: Option) { - let error = if let Some(error) = error { - Some(error) + dbg!(&self, &error); + let (error, is_local) = if let Some(error) = error { + (Some(error), false) } else { - match &mut self.inner { - InnerState::Draining { error } => error.take(), - InnerState::Drained { error } => error.take(), + let error = match &mut self.inner { + InnerState::Draining { error, .. } => error.take(), + InnerState::Drained { .. } => panic!("invalid state transition drained -> drained"), InnerState::Closed { remote_reason, - local_reason, error_read, + .. } => { if *error_read { None } else { *error_read = true; - if *local_reason { - Some(ConnectionError::LocallyClosed) - } else { - let error = match remote_reason.clone().reason.into() { - ConnectionError::ConnectionClosed(close) => { - if close.error_code == TransportErrorCode::PROTOCOL_VIOLATION { - ConnectionError::TransportError(close.error_code.into()) - } else { - ConnectionError::ConnectionClosed(close) - } + let error = match remote_reason.clone().reason.into() { + ConnectionError::ConnectionClosed(close) => { + if close.error_code == TransportErrorCode::PROTOCOL_VIOLATION { + ConnectionError::TransportError(close.error_code.into()) + } else { + ConnectionError::ConnectionClosed(close) } - e => e, - }; - Some(error) - } + } + e => e, + }; + Some(error) } } InnerState::Handshake(_) | InnerState::Established => None, - } + }; + (error, self.is_local_close()) }; - self.inner = InnerState::Drained { error }; + self.inner = InnerState::Drained { error, is_local }; } - pub(super) fn to_draining(&mut self, error: ConnectionError) { - self.inner = InnerState::Draining { error: Some(error) }; + pub(super) fn to_draining(&mut self, error: Option) { + dbg!(&self, &error); + assert!( + matches!( + self.inner, + InnerState::Handshake(_) | InnerState::Established | InnerState::Closed { .. } + ), + "invalid state transition {:?} -> draining", + self.as_typ() + ); + let is_local = self.is_local_close(); + self.inner = InnerState::Draining { error, is_local }; } - pub(super) fn to_draining_clean(&mut self) { - let mut error = None; - if let InnerState::Closed { local_reason, .. } = &self.inner { - if *local_reason { - error.replace(ConnectionError::LocallyClosed); - } + fn is_local_close(&self) -> bool { + match self.inner { + InnerState::Handshake(_) => false, + InnerState::Established => false, + InnerState::Closed { is_local, .. } => is_local, + InnerState::Draining { is_local, .. } => is_local, + InnerState::Drained { is_local, .. } => is_local, } - self.inner = InnerState::Draining { error }; } pub(super) fn to_closed>(&mut self, reason: R) { + assert!( + matches!( + self.inner, + InnerState::Handshake(_) | InnerState::Established + ), + "invalid state transition {:?} -> closed", + self.as_typ() + ); self.inner = InnerState::Closed { error_read: false, remote_reason: Closed { reason: reason.into(), }, - local_reason: false, + is_local: false, }; } pub(super) fn to_closed_locally>(&mut self, reason: R) { + assert!( + matches!( + self.inner, + InnerState::Handshake(_) | InnerState::Established + ), + "invalid state transition {:?} -> closed", + self.as_typ() + ); self.inner = InnerState::Closed { error_read: false, remote_reason: Closed { reason: reason.into(), }, - local_reason: true, + is_local: true, }; } @@ -153,11 +177,23 @@ impl State { pub(super) fn take_error(&mut self) -> Option { match &mut self.inner { - InnerState::Draining { error } => error.take(), - InnerState::Drained { error } => error.take(), + InnerState::Draining { error, is_local } => { + if !*is_local { + error.take() + } else { + None + } + } + InnerState::Drained { error, is_local } => { + if !*is_local { + error.take() + } else { + None + } + } InnerState::Closed { remote_reason, - local_reason, + is_local: local_reason, error_read, } => { if *error_read { @@ -165,7 +201,7 @@ impl State { } else { *error_read = true; if *local_reason { - Some(ConnectionError::LocallyClosed) + None } else { Some(remote_reason.clone().reason.into()) } @@ -205,18 +241,20 @@ enum InnerState { /// The reason the remote closed the connection, or the reason we are sending to the remote. remote_reason: Closed, /// Set to true if we closed the connection locally - local_reason: bool, + is_local: bool, /// Did we read this as error already? error_read: bool, }, Draining { /// Why the connection was lost, if it has been error: Option, + is_local: bool, }, /// Waiting for application to call close so we can dispose of the resources Drained { /// Why the connection was lost, if it has been error: Option, + is_local: bool, }, } @@ -257,3 +295,21 @@ pub struct Handshake { pub struct Closed { pub(super) reason: Close, } + +#[cfg(test)] +mod tests { + use super::*; + + /// This makes sure that the assumption of error set if drained holds up. + #[test] + fn test_always_error_if_drained() { + let mut state = State { + inner: InnerState::Draining { + error: Some(ConnectionError::Reset), + is_local: true, + }, + }; + state.to_drained(None); + assert_eq!(state.take_error(), Some(ConnectionError::Reset)); + } +} diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index c5e16594a..ea1142626 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -145,12 +145,7 @@ fn lifecycle() { Some(Event::ConnectionLost { reason: ConnectionError::ApplicationClosed( ApplicationClose { error_code: VarInt(42), ref reason } )}) if reason == REASON); - assert_matches!( - pair.client_conn_mut(client_ch).poll(), - Some(Event::ConnectionLost { - reason: ConnectionError::LocallyClosed - }) - ); + assert_matches!(pair.client_conn_mut(client_ch).poll(), None); assert_eq!(pair.client.known_connections(), 0); assert_eq!(pair.client.known_cids(), 0); assert_eq!(pair.server.known_connections(), 0); @@ -183,12 +178,7 @@ fn draft_version_compat() { Some(Event::ConnectionLost { reason: ConnectionError::ApplicationClosed( ApplicationClose { error_code: VarInt(42), ref reason } )}) if reason == REASON); - assert_matches!( - pair.client_conn_mut(client_ch).poll(), - Some(Event::ConnectionLost { - reason: ConnectionError::LocallyClosed - }) - ); + assert_matches!(pair.client_conn_mut(client_ch).poll(), None); assert_eq!(pair.client.known_connections(), 0); assert_eq!(pair.client.known_cids(), 0); assert_eq!(pair.server.known_connections(), 0); @@ -448,10 +438,8 @@ fn reject_self_signed_server_cert() { pair.drive(); - let a = pair.client_conn_mut(client_ch).poll(); - dbg!(&a); assert_matches!( - a, + pair.client_conn_mut(client_ch).poll(), Some(Event::ConnectionLost { reason: ConnectionError::ConnectionClosed(ref reason)}) if reason.error_code == TransportErrorCode::crypto(AlertDescription::UnknownCA.into()) ); @@ -1157,12 +1145,7 @@ fn instant_close_1() { .close(pair.time, VarInt(0), reason); pair.drive(); let server_ch = pair.server.assert_accept(); - assert_matches!( - pair.client_conn_mut(client_ch).poll(), - Some(Event::ConnectionLost { - reason: ConnectionError::LocallyClosed - }) - ); + assert_matches!(pair.client_conn_mut(client_ch).poll(), None); assert_matches!( pair.server_conn_mut(server_ch).poll(), Some(Event::ConnectionLost { @@ -1189,12 +1172,7 @@ fn instant_close_2() { .unwrap() .close(pair.time, VarInt(42), reason); pair.drive(); - assert_matches!( - pair.client_conn_mut(client_ch).poll(), - Some(Event::ConnectionLost { - reason: ConnectionError::LocallyClosed - }) - ); + assert_matches!(pair.client_conn_mut(client_ch).poll(), None); let server_ch = pair.server.assert_accept(); assert_matches!( pair.server_conn_mut(server_ch).poll(), From 219175b5fb69599939d3c7ee91fc0db1e9de6a39 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 28 Nov 2025 10:55:31 +0100 Subject: [PATCH 08/14] apply CR --- quinn-proto/src/connection/mod.rs | 54 +++++++++++++-------------- quinn-proto/src/connection/state.rs | 58 ++++++++++++----------------- 2 files changed, 51 insertions(+), 61 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 5b06f4aef..dc6ef6049 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -99,7 +99,7 @@ mod transmit_buf; use transmit_buf::TransmitBuf; mod state; -use state::{State, StateTyp}; +use state::{State, StateType}; /// Protocol state and logic for a single QUIC connection /// @@ -881,12 +881,12 @@ impl Connection { // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths // Check whether we need to send a close message - let close = match self.state.as_typ() { - StateTyp::Drained => { + let close = match self.state.as_type() { + StateType::Drained => { self.app_limited = true; return None; } - StateTyp::Draining | StateTyp::Closed => { + StateType::Draining | StateType::Closed => { // self.close is only reset once the associated packet had been // encoded successfully if !self.close { @@ -1210,9 +1210,9 @@ impl Connection { ); if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() { let max_frame_size = builder.frame_space_remaining(); - match self.state.as_typ() { - StateTyp::Closed => { - let reason = &self.state.as_closed().expect("checked").reason; + match self.state.as_type() { + StateType::Closed => { + let reason = &self.state.as_closed().expect("checked"); if space_id == SpaceId::Data || reason.is_transport_layer() { reason.encode(&mut builder.frame_space_mut(), max_frame_size) } else { @@ -1224,7 +1224,7 @@ impl Connection { .encode(&mut builder.frame_space_mut(), max_frame_size) } } - StateTyp::Draining => frame::ConnectionClose { + StateType::Draining => frame::ConnectionClose { error_code: TransportErrorCode::NO_ERROR, frame_type: None, reason: Bytes::new(), @@ -1783,7 +1783,7 @@ impl Connection { match timer { Timer::Conn(timer) => match timer { ConnTimer::Close => { - self.state.to_drained(None); + self.state.move_to_drained(None); self.endpoint_events.push_back(EndpointEventInner::Drained); } ConnTimer::Idle => { @@ -1954,7 +1954,7 @@ impl Connection { self.close_common(); self.set_close_timer(now); self.close = true; - self.state.to_closed_locally(reason); + self.state.move_to_closed_local(reason); } } @@ -3512,24 +3512,24 @@ impl Connection { // State transitions for error cases if let Err(conn_err) = result { match conn_err { - ConnectionError::ApplicationClosed(reason) => self.state.to_closed(reason), - ConnectionError::ConnectionClosed(reason) => self.state.to_closed(reason), + ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason), + ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason), ConnectionError::Reset | ConnectionError::TransportError(TransportError { code: TransportErrorCode::AEAD_LIMIT_REACHED, .. }) => { - self.state.to_drained(Some(conn_err)); + self.state.move_to_drained(Some(conn_err)); } ConnectionError::TimedOut => { unreachable!("timeouts aren't generated by packet processing"); } ConnectionError::TransportError(err) => { debug!("closing connection due to transport error: {}", err); - self.state.to_closed(err); + self.state.move_to_closed(err); } ConnectionError::VersionMismatch => { - self.state.to_draining(Some(conn_err)); + self.state.moved_to_draining(Some(conn_err)); } ConnectionError::LocallyClosed => { unreachable!("LocallyClosed isn't generated by packet processing"); @@ -3554,7 +3554,7 @@ impl Connection { } // Transmit CONNECTION_CLOSE if necessary - if matches!(self.state.as_typ(), StateTyp::Closed) { + if matches!(self.state.as_type(), StateType::Closed) { // If there is no PathData for this PathId the packet was for a brand new // path. It was a valid packet however, so the remote is valid and we want to // send CONNECTION_CLOSE. @@ -3582,8 +3582,8 @@ impl Connection { trace!(%path_id, ?number, "discarding packet for unknown path"); return Ok(()); } - let state = match self.state.as_typ() { - StateTyp::Established => { + let state = match self.state.as_type() { + StateType::Established => { match packet.header.space() { SpaceId::Data => { self.process_payload(now, remote, path_id, number.unwrap(), packet)? @@ -3597,7 +3597,7 @@ impl Connection { } return Ok(()); } - StateTyp::Closed => { + StateType::Closed => { for result in frame::Iter::new(packet.payload.freeze())? { let frame = match result { Ok(frame) => frame, @@ -3615,14 +3615,14 @@ impl Connection { if let Frame::Close(_error) = frame { trace!("draining"); - self.state.to_draining(None); + self.state.moved_to_draining(None); break; } } return Ok(()); } - StateTyp::Draining | StateTyp::Drained => return Ok(()), - StateTyp::Handshake => self.state.as_handshake_mut().expect("checked"), + StateType::Draining | StateType::Drained => return Ok(()), + StateType::Handshake => self.state.as_handshake_mut().expect("checked"), }; match packet.header { @@ -3796,7 +3796,7 @@ impl Connection { } self.events.push_back(Event::Connected); - self.state.to_established(); + self.state.move_to_established(); trace!("established"); // Multipath can only be enabled after the state has reached Established. @@ -3820,7 +3820,7 @@ impl Connection { self.rem_handshake_cid = rem_cid; self.orig_rem_cid = rem_cid; state.rem_cid_set = true; - self.state.to_handshake(state); + self.state.move_to_handshake(state); } else if rem_cid != self.rem_handshake_cid { debug!( "discarding packet with mismatched remote CID: {} != {}", @@ -3918,7 +3918,7 @@ impl Connection { self.on_path_ack_received(now, packet.header.space(), ack)?; } Frame::Close(reason) => { - self.state.to_draining(Some(reason.into())); + self.state.moved_to_draining(Some(reason.into())); return Ok(()); } _ => { @@ -4610,7 +4610,7 @@ impl Connection { self.streams.queue_max_stream_id(pending); if let Some(reason) = close { - self.state.to_draining(Some(reason.into())); + self.state.moved_to_draining(Some(reason.into())); self.close = true; } @@ -5809,7 +5809,7 @@ impl Connection { /// Terminate the connection instantly, without sending a close packet fn kill(&mut self, reason: ConnectionError) { self.close_common(); - self.state.to_drained(Some(reason)); + self.state.move_to_drained(Some(reason)); self.endpoint_events.push_back(EndpointEventInner::Drained); } diff --git a/quinn-proto/src/connection/state.rs b/quinn-proto/src/connection/state.rs index 0ade02ec4..77e849c84 100644 --- a/quinn-proto/src/connection/state.rs +++ b/quinn-proto/src/connection/state.rs @@ -26,7 +26,7 @@ impl State { } } - pub(super) fn as_closed(&self) -> Option<&Closed> { + pub(super) fn as_closed(&self) -> Option<&Close> { if let InnerState::Closed { ref remote_reason, .. } = self.inner @@ -51,15 +51,15 @@ impl State { } } - pub(super) fn to_handshake(&mut self, hs: Handshake) { + pub(super) fn move_to_handshake(&mut self, hs: Handshake) { self.inner = InnerState::Handshake(hs); } - pub(super) fn to_established(&mut self) { + pub(super) fn move_to_established(&mut self) { self.inner = InnerState::Established; } - pub(super) fn to_drained(&mut self, error: Option) { + pub(super) fn move_to_drained(&mut self, error: Option) { dbg!(&self, &error); let (error, is_local) = if let Some(error) = error { (Some(error), false) @@ -76,7 +76,7 @@ impl State { None } else { *error_read = true; - let error = match remote_reason.clone().reason.into() { + let error = match remote_reason.clone().into() { ConnectionError::ConnectionClosed(close) => { if close.error_code == TransportErrorCode::PROTOCOL_VIOLATION { ConnectionError::TransportError(close.error_code.into()) @@ -96,7 +96,7 @@ impl State { self.inner = InnerState::Drained { error, is_local }; } - pub(super) fn to_draining(&mut self, error: Option) { + pub(super) fn moved_to_draining(&mut self, error: Option) { dbg!(&self, &error); assert!( matches!( @@ -104,7 +104,7 @@ impl State { InnerState::Handshake(_) | InnerState::Established | InnerState::Closed { .. } ), "invalid state transition {:?} -> draining", - self.as_typ() + self.as_type() ); let is_local = self.is_local_close(); self.inner = InnerState::Draining { error, is_local }; @@ -120,38 +120,34 @@ impl State { } } - pub(super) fn to_closed>(&mut self, reason: R) { + pub(super) fn move_to_closed>(&mut self, reason: R) { assert!( matches!( self.inner, InnerState::Handshake(_) | InnerState::Established ), "invalid state transition {:?} -> closed", - self.as_typ() + self.as_type() ); self.inner = InnerState::Closed { error_read: false, - remote_reason: Closed { - reason: reason.into(), - }, + remote_reason: reason.into(), is_local: false, }; } - pub(super) fn to_closed_locally>(&mut self, reason: R) { + pub(super) fn move_to_closed_local>(&mut self, reason: R) { assert!( matches!( self.inner, InnerState::Handshake(_) | InnerState::Established ), - "invalid state transition {:?} -> closed", - self.as_typ() + "invalid state transition {:?} -> closed (local)", + self.as_type() ); self.inner = InnerState::Closed { error_read: false, - remote_reason: Closed { - reason: reason.into(), - }, + remote_reason: reason.into(), is_local: true, }; } @@ -203,7 +199,7 @@ impl State { if *local_reason { None } else { - Some(remote_reason.clone().reason.into()) + Some(remote_reason.clone().into()) } } } @@ -211,20 +207,20 @@ impl State { } } - pub(super) fn as_typ(&self) -> StateTyp { + pub(super) fn as_type(&self) -> StateType { match self.inner { - InnerState::Handshake(_) => StateTyp::Handshake, - InnerState::Established => StateTyp::Established, - InnerState::Closed { .. } => StateTyp::Closed, - InnerState::Draining { .. } => StateTyp::Draining, - InnerState::Drained { .. } => StateTyp::Drained, + InnerState::Handshake(_) => StateType::Handshake, + InnerState::Established => StateType::Established, + InnerState::Closed { .. } => StateType::Closed, + InnerState::Draining { .. } => StateType::Draining, + InnerState::Drained { .. } => StateType::Drained, } } } #[allow(unreachable_pub)] // fuzzing only #[derive(Debug, Clone)] -pub enum StateTyp { +pub enum StateType { Handshake, Established, Closed, @@ -239,7 +235,7 @@ enum InnerState { // TODO: should this be split into `ClosedLocal` and `ClosedRemote`? Closed { /// The reason the remote closed the connection, or the reason we are sending to the remote. - remote_reason: Closed, + remote_reason: Close, /// Set to true if we closed the connection locally is_local: bool, /// Did we read this as error already? @@ -290,12 +286,6 @@ pub struct Handshake { pub(super) allow_server_migration: bool, } -#[allow(unreachable_pub)] // fuzzing only -#[derive(Debug, Clone)] -pub struct Closed { - pub(super) reason: Close, -} - #[cfg(test)] mod tests { use super::*; @@ -309,7 +299,7 @@ mod tests { is_local: true, }, }; - state.to_drained(None); + state.move_to_drained(None); assert_eq!(state.take_error(), Some(ConnectionError::Reset)); } } From 4ac8f0321273d031183357bea9808776ad18d3b4 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 28 Nov 2025 11:38:22 +0100 Subject: [PATCH 09/14] cleanup --- quinn-proto/src/connection/mod.rs | 9 ++++---- quinn-proto/src/connection/state.rs | 32 ++++++----------------------- 2 files changed, 10 insertions(+), 31 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index dc6ef6049..2d5b5fd35 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -486,7 +486,6 @@ impl Connection { return Some(Event::Stream(event)); } - dbg!(&self.state); if let Some(reason) = self.state.take_error() { return Some(Event::ConnectionLost { reason }); } @@ -3529,7 +3528,7 @@ impl Connection { self.state.move_to_closed(err); } ConnectionError::VersionMismatch => { - self.state.moved_to_draining(Some(conn_err)); + self.state.move_to_draining(Some(conn_err)); } ConnectionError::LocallyClosed => { unreachable!("LocallyClosed isn't generated by packet processing"); @@ -3615,7 +3614,7 @@ impl Connection { if let Frame::Close(_error) = frame { trace!("draining"); - self.state.moved_to_draining(None); + self.state.move_to_draining(None); break; } } @@ -3918,7 +3917,7 @@ impl Connection { self.on_path_ack_received(now, packet.header.space(), ack)?; } Frame::Close(reason) => { - self.state.moved_to_draining(Some(reason.into())); + self.state.move_to_draining(Some(reason.into())); return Ok(()); } _ => { @@ -4610,7 +4609,7 @@ impl Connection { self.streams.queue_max_stream_id(pending); if let Some(reason) = close { - self.state.moved_to_draining(Some(reason.into())); + self.state.move_to_draining(Some(reason.into())); self.close = true; } diff --git a/quinn-proto/src/connection/state.rs b/quinn-proto/src/connection/state.rs index 77e849c84..909cb7720 100644 --- a/quinn-proto/src/connection/state.rs +++ b/quinn-proto/src/connection/state.rs @@ -6,6 +6,7 @@ use crate::{ConnectionError, TransportErrorCode}; #[allow(unreachable_pub)] // fuzzing only #[derive(Debug, Clone)] pub struct State { + /// Nested `InnerState` to enforce all state transitions are done in this module inner: InnerState, } @@ -37,9 +38,8 @@ impl State { } } - #[allow(unreachable_pub)] // fuzzing only #[cfg(test)] - pub fn established() -> Self { + pub(super) fn established() -> Self { Self { inner: InnerState::Established, } @@ -60,7 +60,6 @@ impl State { } pub(super) fn move_to_drained(&mut self, error: Option) { - dbg!(&self, &error); let (error, is_local) = if let Some(error) = error { (Some(error), false) } else { @@ -96,8 +95,7 @@ impl State { self.inner = InnerState::Drained { error, is_local }; } - pub(super) fn moved_to_draining(&mut self, error: Option) { - dbg!(&self, &error); + pub(super) fn move_to_draining(&mut self, error: Option) { assert!( matches!( self.inner, @@ -218,9 +216,8 @@ impl State { } } -#[allow(unreachable_pub)] // fuzzing only #[derive(Debug, Clone)] -pub enum StateType { +pub(super) enum StateType { Handshake, Established, Closed, @@ -232,7 +229,6 @@ pub enum StateType { enum InnerState { Handshake(Handshake), Established, - // TODO: should this be split into `ClosedLocal` and `ClosedRemote`? Closed { /// The reason the remote closed the connection, or the reason we are sending to the remote. remote_reason: Close, @@ -244,12 +240,14 @@ enum InnerState { Draining { /// Why the connection was lost, if it has been error: Option, + /// Set to true if we closed the connection locally is_local: bool, }, /// Waiting for application to call close so we can dispose of the resources Drained { /// Why the connection was lost, if it has been error: Option, + /// Set to true if we closed the connection locally is_local: bool, }, } @@ -285,21 +283,3 @@ pub struct Handshake { /// other than using the preferred-address transport parameter. pub(super) allow_server_migration: bool, } - -#[cfg(test)] -mod tests { - use super::*; - - /// This makes sure that the assumption of error set if drained holds up. - #[test] - fn test_always_error_if_drained() { - let mut state = State { - inner: InnerState::Draining { - error: Some(ConnectionError::Reset), - is_local: true, - }, - }; - state.move_to_drained(None); - assert_eq!(state.take_error(), Some(ConnectionError::Reset)); - } -} From 300bf95b01d3a950a0c36b01b09e665846a2c814 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 28 Nov 2025 13:08:29 +0100 Subject: [PATCH 10/14] fixup: undo test changes --- quinn-proto/src/tests/mod.rs | 10 +++------- quinn/src/tests.rs | 4 +--- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index ea1142626..776e203bf 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -886,10 +886,8 @@ fn alpn_mismatch() { ])))); pair.drive(); - let res = pair.client_conn_mut(client_ch).poll(); - dbg!(&res); assert_matches!( - res, + pair.client_conn_mut(client_ch).poll(), Some(Event::ConnectionLost { reason: ConnectionError::ConnectionClosed(err) }) if err.error_code == TransportErrorCode::crypto(0x78) ); } @@ -1137,12 +1135,11 @@ fn instant_close_1() { let mut pair = Pair::default(); info!("connecting"); let client_ch = pair.begin_connect(client_config()); - let reason = Bytes::new(); pair.client .connections .get_mut(&client_ch) .unwrap() - .close(pair.time, VarInt(0), reason); + .close(pair.time, VarInt(0), Bytes::new()); pair.drive(); let server_ch = pair.server.assert_accept(); assert_matches!(pair.client_conn_mut(client_ch).poll(), None); @@ -1165,12 +1162,11 @@ fn instant_close_2() { let client_ch = pair.begin_connect(client_config()); // Unlike `instant_close`, the server sees a valid Initial packet first. pair.drive_client(); - let reason = Bytes::new(); pair.client .connections .get_mut(&client_ch) .unwrap() - .close(pair.time, VarInt(42), reason); + .close(pair.time, VarInt(42), Bytes::new()); pair.drive(); assert_matches!(pair.client_conn_mut(client_ch).poll(), None); let server_ch = pair.server.assert_accept(); diff --git a/quinn/src/tests.rs b/quinn/src/tests.rs index 8509cfddd..f286cf3cd 100755 --- a/quinn/src/tests.rs +++ b/quinn/src/tests.rs @@ -105,9 +105,7 @@ async fn close_endpoint() { .unwrap(); endpoint.close(0u32.into(), &[]); match conn.await { - Err( - crate::ConnectionError::LocallyClosed | crate::ConnectionError::ApplicationClosed(_), - ) => (), + Err(crate::ConnectionError::LocallyClosed) => (), Err(e) => panic!("unexpected error: {e}"), Ok(_) => { panic!("unexpected success"); From 9dcc6a06f58d3f7f33d9824eb978af118b40aea0 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 28 Nov 2025 13:24:24 +0100 Subject: [PATCH 11/14] fix: preserve transporterror knowledge --- quinn-proto/src/connection/mod.rs | 3 +- quinn-proto/src/connection/state.rs | 64 ++++++++++++++++++++++++++--- quinn-proto/src/tests/mod.rs | 12 +++--- 3 files changed, 66 insertions(+), 13 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 2d5b5fd35..9c62aad53 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1211,7 +1211,8 @@ impl Connection { let max_frame_size = builder.frame_space_remaining(); match self.state.as_type() { StateType::Closed => { - let reason = &self.state.as_closed().expect("checked"); + let reason: Close = + self.state.as_closed().expect("checked").clone().into(); if space_id == SpaceId::Data || reason.is_transport_layer() { reason.encode(&mut builder.frame_space_mut(), max_frame_size) } else { diff --git a/quinn-proto/src/connection/state.rs b/quinn-proto/src/connection/state.rs index 909cb7720..3e1139b10 100644 --- a/quinn-proto/src/connection/state.rs +++ b/quinn-proto/src/connection/state.rs @@ -1,7 +1,9 @@ use bytes::Bytes; use crate::frame::Close; -use crate::{ConnectionError, TransportErrorCode}; +use crate::{ + ApplicationClose, ConnectionClose, ConnectionError, TransportError, TransportErrorCode, +}; #[allow(unreachable_pub)] // fuzzing only #[derive(Debug, Clone)] @@ -27,7 +29,7 @@ impl State { } } - pub(super) fn as_closed(&self) -> Option<&Close> { + pub(super) fn as_closed(&self) -> Option<&CloseReason> { if let InnerState::Closed { ref remote_reason, .. } = self.inner @@ -118,7 +120,7 @@ impl State { } } - pub(super) fn move_to_closed>(&mut self, reason: R) { + pub(super) fn move_to_closed>(&mut self, reason: R) { assert!( matches!( self.inner, @@ -134,7 +136,7 @@ impl State { }; } - pub(super) fn move_to_closed_local>(&mut self, reason: R) { + pub(super) fn move_to_closed_local>(&mut self, reason: R) { assert!( matches!( self.inner, @@ -225,13 +227,65 @@ pub(super) enum StateType { Drained, } +#[derive(Debug, Clone)] +pub(super) enum CloseReason { + TransportError(TransportError), + Connection(ConnectionClose), + Application(ApplicationClose), +} + +impl From for CloseReason { + fn from(x: TransportError) -> Self { + Self::TransportError(x) + } +} +impl From for CloseReason { + fn from(x: ConnectionClose) -> Self { + Self::Connection(x) + } +} +impl From for CloseReason { + fn from(x: ApplicationClose) -> Self { + Self::Application(x) + } +} + +impl From for CloseReason { + fn from(value: Close) -> Self { + match value { + Close::Application(reason) => Self::Application(reason), + Close::Connection(reason) => Self::Connection(reason), + } + } +} + +impl From for ConnectionError { + fn from(value: CloseReason) -> Self { + match value { + CloseReason::TransportError(err) => Self::TransportError(err), + CloseReason::Connection(reason) => Self::ConnectionClosed(reason), + CloseReason::Application(reason) => Self::ApplicationClosed(reason), + } + } +} + +impl From for Close { + fn from(value: CloseReason) -> Self { + match value { + CloseReason::TransportError(err) => Self::Connection(err.into()), + CloseReason::Connection(reason) => Self::Connection(reason), + CloseReason::Application(reason) => Self::Application(reason), + } + } +} + #[derive(Debug, Clone)] enum InnerState { Handshake(Handshake), Established, Closed { /// The reason the remote closed the connection, or the reason we are sending to the remote. - remote_reason: Close, + remote_reason: CloseReason, /// Set to true if we closed the connection locally is_local: bool, /// Did we read this as error already? diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index 776e203bf..c9fe23c9a 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -438,11 +438,9 @@ fn reject_self_signed_server_cert() { pair.drive(); - assert_matches!( - pair.client_conn_mut(client_ch).poll(), - Some(Event::ConnectionLost { reason: ConnectionError::ConnectionClosed(ref reason)}) - if reason.error_code == TransportErrorCode::crypto(AlertDescription::UnknownCA.into()) - ); + assert_matches!(pair.client_conn_mut(client_ch).poll(), + Some(Event::ConnectionLost { reason: ConnectionError::TransportError(ref error)}) + if error.code == TransportErrorCode::crypto(AlertDescription::UnknownCA.into())); } #[test] @@ -499,8 +497,8 @@ fn reject_missing_client_cert() { Some(Event::HandshakeDataReady) ); assert_matches!(pair.server_conn_mut(server_ch).poll(), - Some(Event::ConnectionLost { reason: ConnectionError::ConnectionClosed(ref close)}) - if close.error_code == TransportErrorCode::crypto(AlertDescription::CertificateRequired.into())); + Some(Event::ConnectionLost { reason: ConnectionError::TransportError(ref error)}) + if error.code == TransportErrorCode::crypto(AlertDescription::CertificateRequired.into())); } #[test] From 881f756387224adfee491be0fe2af3e2f2b5d18c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diva=20Mart=C3=ADnez?= Date: Sat, 29 Nov 2025 20:22:22 -0500 Subject: [PATCH 12/14] simplify `move_to_drained` --- quinn-proto/src/connection/state.rs | 32 +++++++++++------------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/quinn-proto/src/connection/state.rs b/quinn-proto/src/connection/state.rs index 3e1139b10..2311f3cdf 100644 --- a/quinn-proto/src/connection/state.rs +++ b/quinn-proto/src/connection/state.rs @@ -68,27 +68,19 @@ impl State { let error = match &mut self.inner { InnerState::Draining { error, .. } => error.take(), InnerState::Drained { .. } => panic!("invalid state transition drained -> drained"), - InnerState::Closed { - remote_reason, - error_read, - .. - } => { - if *error_read { - None - } else { - *error_read = true; - let error = match remote_reason.clone().into() { - ConnectionError::ConnectionClosed(close) => { - if close.error_code == TransportErrorCode::PROTOCOL_VIOLATION { - ConnectionError::TransportError(close.error_code.into()) - } else { - ConnectionError::ConnectionClosed(close) - } + InnerState::Closed { error_read, .. } if *error_read => None, + InnerState::Closed { remote_reason, .. } => { + let error = match remote_reason.clone().into() { + ConnectionError::ConnectionClosed(close) => { + if close.error_code == TransportErrorCode::PROTOCOL_VIOLATION { + ConnectionError::TransportError(close.error_code.into()) + } else { + ConnectionError::ConnectionClosed(close) } - e => e, - }; - Some(error) - } + } + e => e, + }; + Some(error) } InnerState::Handshake(_) | InnerState::Established => None, }; From f272fdc5271757af2158a34eab1975c5bd108c6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diva=20Mart=C3=ADnez?= Date: Sat, 29 Nov 2025 21:04:06 -0500 Subject: [PATCH 13/14] document the panics, add periods at the end of sentences :D --- quinn-proto/src/connection/state.rs | 40 +++++++++++++++++++---------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/quinn-proto/src/connection/state.rs b/quinn-proto/src/connection/state.rs index 2311f3cdf..09a682bea 100644 --- a/quinn-proto/src/connection/state.rs +++ b/quinn-proto/src/connection/state.rs @@ -8,7 +8,7 @@ use crate::{ #[allow(unreachable_pub)] // fuzzing only #[derive(Debug, Clone)] pub struct State { - /// Nested `InnerState` to enforce all state transitions are done in this module + /// Nested [`InnerState`] to enforce all state transitions are done in this module. inner: InnerState, } @@ -47,9 +47,9 @@ impl State { } } - pub(super) fn handshake(handshake: Handshake) -> Self { + pub(super) fn handshake(hs: Handshake) -> Self { Self { - inner: InnerState::Handshake(handshake), + inner: InnerState::Handshake(hs), } } @@ -61,6 +61,9 @@ impl State { self.inner = InnerState::Established; } + /// Moves to a draining state. + /// + /// Panics if the state was already drained. pub(super) fn move_to_drained(&mut self, error: Option) { let (error, is_local) = if let Some(error) = error { (Some(error), false) @@ -89,6 +92,9 @@ impl State { self.inner = InnerState::Drained { error, is_local }; } + /// Moves to a draining state. + /// + /// Panics if the state is already draining or drained. pub(super) fn move_to_draining(&mut self, error: Option) { assert!( matches!( @@ -112,6 +118,9 @@ impl State { } } + /// Moves to a closed state after a remote error is received. + /// + /// Panics if the state is later than established. pub(super) fn move_to_closed>(&mut self, reason: R) { assert!( matches!( @@ -128,6 +137,9 @@ impl State { }; } + /// Moves to a closed state after a local error. + /// + /// Panics if the state is later than established. pub(super) fn move_to_closed_local>(&mut self, reason: R) { assert!( matches!( @@ -278,22 +290,22 @@ enum InnerState { Closed { /// The reason the remote closed the connection, or the reason we are sending to the remote. remote_reason: CloseReason, - /// Set to true if we closed the connection locally + /// Set to true if we closed the connection locally. is_local: bool, /// Did we read this as error already? error_read: bool, }, Draining { - /// Why the connection was lost, if it has been + /// Why the connection was lost, if it has been. error: Option, - /// Set to true if we closed the connection locally + /// Set to true if we closed the connection locally. is_local: bool, }, - /// Waiting for application to call close so we can dispose of the resources + /// Waiting for application to call close so we can dispose of the resources. Drained { - /// Why the connection was lost, if it has been + /// Why the connection was lost, if it has been. error: Option, - /// Set to true if we closed the connection locally + /// Set to true if we closed the connection locally. is_local: bool, }, } @@ -301,19 +313,19 @@ enum InnerState { #[allow(unreachable_pub)] // fuzzing only #[derive(Debug, Clone)] pub struct Handshake { - /// Whether the remote CID has been set by the peer yet + /// Whether the remote CID has been set by the peer yet. /// - /// Always set for servers + /// Always set for servers. pub(super) rem_cid_set: bool, /// Stateless retry token received in the first Initial by a server. /// /// Must be present in every Initial. Always empty for clients. pub(super) expected_token: Bytes, - /// First cryptographic message + /// First cryptographic message. /// - /// Only set for clients + /// Only set for clients. pub(super) client_hello: Option, - /// Whether the server address is allowed to migrate + /// Whether the server address is allowed to migrate. /// /// We allow the server to migrate during the handshake as long as we have not /// received an authenticated handshake packet: it can send a response from a From a17f18512e8fb02602a67e5a69f6d1c1af3743f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diva=20Mart=C3=ADnez?= Date: Sat, 29 Nov 2025 21:09:33 -0500 Subject: [PATCH 14/14] fix missing doc link --- quinn-proto/src/connection/state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quinn-proto/src/connection/state.rs b/quinn-proto/src/connection/state.rs index 09a682bea..7b118de10 100644 --- a/quinn-proto/src/connection/state.rs +++ b/quinn-proto/src/connection/state.rs @@ -335,7 +335,7 @@ pub struct Handshake { /// /// This is only ever set to true if for a client which hasn't yet received an /// authenticated handshake packet. It is set back to false in - /// [`Connection::on_packet_authenticated`]. + /// [`super::Connection::on_packet_authenticated`]. /// /// THIS IS NOT RFC 9000 COMPLIANT! A server is not allowed to migrate addresses, /// other than using the preferred-address transport parameter.