From 8c91068e215f4ebf0875a34807c888efa33e390e Mon Sep 17 00:00:00 2001 From: Frando Date: Sat, 22 Nov 2025 17:41:56 +0100 Subject: [PATCH 1/2] feat: add vantage point to qlog traces --- quinn-proto/src/config/mod.rs | 2 +- quinn-proto/src/config/transport.rs | 20 +++++++++++++++----- quinn-proto/src/lib.rs | 2 +- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/quinn-proto/src/config/mod.rs b/quinn-proto/src/config/mod.rs index 6d2351a27..311806162 100644 --- a/quinn-proto/src/config/mod.rs +++ b/quinn-proto/src/config/mod.rs @@ -27,7 +27,7 @@ use crate::{ mod transport; #[cfg(feature = "qlog")] -pub use transport::QlogConfig; +pub use transport::{QlogConfig, VantagePointType}; pub use transport::{AckFrequencyConfig, IdleTimeout, MtuDiscoveryConfig, TransportConfig}; #[cfg(doc)] diff --git a/quinn-proto/src/config/transport.rs b/quinn-proto/src/config/transport.rs index d2de9ac0a..4b1f83a8e 100644 --- a/quinn-proto/src/config/transport.rs +++ b/quinn-proto/src/config/transport.rs @@ -6,6 +6,8 @@ use std::{ #[cfg(feature = "qlog")] use std::{io, sync::Mutex, time::Instant}; +#[cfg(feature = "qlog")] +pub use qlog::VantagePointType; #[cfg(feature = "qlog")] use qlog::streamer::QlogStreamer; @@ -703,6 +705,7 @@ pub struct QlogConfig { title: Option, description: Option, start_time: Instant, + vantage_point: qlog::VantagePoint, } #[cfg(feature = "qlog")] @@ -731,17 +734,19 @@ impl QlogConfig { self } + /// Vantage point for this trace + pub fn vantage_point(&mut self, vantage_point: VantagePointType, name: Option) { + self.vantage_point.name = name; + self.vantage_point.ty = vantage_point; + } + /// Construct the [`QlogStream`] described by this configuration pub fn into_stream(self) -> Option { use tracing::warn; let writer = self.writer?; let trace = qlog::TraceSeq::new( - qlog::VantagePoint { - name: None, - ty: qlog::VantagePointType::Unknown, - flow: None, - }, + self.vantage_point, self.title.clone(), self.description.clone(), Some(qlog::Configuration { @@ -780,6 +785,11 @@ impl Default for QlogConfig { title: None, description: None, start_time: Instant::now(), + vantage_point: qlog::VantagePoint { + name: None, + ty: qlog::VantagePointType::Unknown, + flow: None, + }, } } } diff --git a/quinn-proto/src/lib.rs b/quinn-proto/src/lib.rs index 9f3c3711e..5e5cd018e 100644 --- a/quinn-proto/src/lib.rs +++ b/quinn-proto/src/lib.rs @@ -58,7 +58,7 @@ mod config; #[cfg(doc)] pub use config::DEFAULT_CONCURRENT_MULTIPATH_PATHS_WHEN_ENABLED; #[cfg(feature = "qlog")] -pub use config::QlogConfig; +pub use config::{QlogConfig, VantagePointType}; pub use config::{ AckFrequencyConfig, ClientConfig, ConfigError, EndpointConfig, IdleTimeout, MtuDiscoveryConfig, ServerConfig, StdSystemTime, TimeSource, TransportConfig, ValidationTokenConfig, From b9e83dc0fa72efbdd9d6c86cc4ef7705e96a7ed3 Mon Sep 17 00:00:00 2001 From: Frando Date: Sun, 23 Nov 2025 11:40:12 +0100 Subject: [PATCH 2/2] feat: record outgoing quic frames in qlog feat: add ConnectionStarted to qlog feat: track received packets in qlog fix: cid groups, padding size, cleanups fixup vantage_point method signature chore: clippy --- quinn-proto/src/config/mod.rs | 2 +- quinn-proto/src/config/transport.rs | 9 +- quinn-proto/src/connection/mod.rs | 270 +++++++++++--- quinn-proto/src/connection/packet_builder.rs | 45 ++- quinn-proto/src/connection/qlog.rs | 360 ++++++++++++++++--- quinn-proto/src/connection/streams/state.rs | 59 ++- quinn-proto/src/frame.rs | 12 + quinn-proto/src/lib.rs | 4 +- quinn-proto/src/packet.rs | 11 + 9 files changed, 661 insertions(+), 111 deletions(-) diff --git a/quinn-proto/src/config/mod.rs b/quinn-proto/src/config/mod.rs index 311806162..e2d81de77 100644 --- a/quinn-proto/src/config/mod.rs +++ b/quinn-proto/src/config/mod.rs @@ -26,9 +26,9 @@ use crate::{ }; mod transport; +pub use transport::{AckFrequencyConfig, IdleTimeout, MtuDiscoveryConfig, TransportConfig}; #[cfg(feature = "qlog")] pub use transport::{QlogConfig, VantagePointType}; -pub use transport::{AckFrequencyConfig, IdleTimeout, MtuDiscoveryConfig, TransportConfig}; #[cfg(doc)] pub use transport::DEFAULT_CONCURRENT_MULTIPATH_PATHS_WHEN_ENABLED; diff --git a/quinn-proto/src/config/transport.rs b/quinn-proto/src/config/transport.rs index 4b1f83a8e..1196b90ae 100644 --- a/quinn-proto/src/config/transport.rs +++ b/quinn-proto/src/config/transport.rs @@ -735,9 +735,14 @@ impl QlogConfig { } /// Vantage point for this trace - pub fn vantage_point(&mut self, vantage_point: VantagePointType, name: Option) { + pub fn vantage_point( + &mut self, + vantage_point: VantagePointType, + name: Option, + ) -> &mut Self { self.vantage_point.name = name; self.vantage_point.ty = vantage_point; + self } /// Construct the [`QlogStream`] described by this configuration @@ -763,7 +768,7 @@ impl QlogConfig { None, self.start_time, trace, - qlog::events::EventImportance::Core, + qlog::events::EventImportance::Extra, writer, ); diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index d1bcc7314..f774d6685 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -11,11 +11,15 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; use frame::StreamMetaVec; +#[cfg(feature = "qlog")] +use ::qlog::events::quic::{AckedRanges, QuicFrame}; use rand::{Rng, SeedableRng, rngs::StdRng}; use rustc_hash::{FxHashMap, FxHashSet}; use thiserror::Error; use tracing::{debug, error, trace, trace_span, warn}; +#[cfg(feature = "qlog")] +use crate::FrameType; use crate::{ Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError, @@ -25,7 +29,10 @@ use crate::{ coding::BufMutExt, config::{ServerConfig, TransportConfig}, congestion::Controller, - connection::timer::{ConnTimer, PathTimer}, + connection::{ + qlog::{QlogRecvPacket, QlogSentPacket}, + timer::{ConnTimer, PathTimer}, + }, crypto::{self, KeyPair, Keys, PacketKey}, frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr}, iroh_hp, @@ -456,6 +463,14 @@ impl Connection { this.write_crypto(); this.init_0rtt(); } + this.config.qlog_sink.emit_connection_started( + now, + loc_cid, + rem_cid, + remote, + local_ip, + this.initial_dst_cid, + ); this } @@ -1149,6 +1164,7 @@ impl Connection { prev.update_unacked = false; } + let mut qlog = QlogSentPacket::default(); let mut builder = PacketBuilder::new( now, space_id, @@ -1157,6 +1173,7 @@ impl Connection { &mut transmit, can_send.other, self, + &mut qlog, )?; last_packet_number = Some(builder.exact_number); coalesce = coalesce && !builder.short_header; @@ -1197,6 +1214,7 @@ impl Connection { is_multipath_enabled, &mut builder.frame_space_mut(), &mut self.stats, + &mut qlog, ); } @@ -1209,31 +1227,47 @@ impl Connection { ); if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() { let max_frame_size = builder.frame_space_remaining(); - match self.state { + #[allow(unused)] + let error_code = match self.state { State::Closed(state::Closed { ref reason }) => { if space_id == SpaceId::Data || reason.is_transport_layer() { - reason.encode(&mut builder.frame_space_mut(), max_frame_size) + reason.encode(&mut builder.frame_space_mut(), max_frame_size); + reason.error_code() } else { + let error_code = TransportErrorCode::APPLICATION_ERROR; frame::ConnectionClose { - error_code: TransportErrorCode::APPLICATION_ERROR, + error_code, frame_type: None, reason: Bytes::new(), } - .encode(&mut builder.frame_space_mut(), max_frame_size) + .encode(&mut builder.frame_space_mut(), max_frame_size); + error_code.into() } } - State::Draining => frame::ConnectionClose { - error_code: TransportErrorCode::NO_ERROR, - frame_type: None, - reason: Bytes::new(), + State::Draining => { + let error_code = TransportErrorCode::NO_ERROR; + frame::ConnectionClose { + error_code, + frame_type: None, + reason: Bytes::new(), + } + .encode(&mut builder.frame_space_mut(), max_frame_size); + error_code.into() } - .encode(&mut builder.frame_space_mut(), max_frame_size), _ => unreachable!( "tried to make a close packet when the connection wasn't closed" ), - } + }; + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::ConnectionClose { + error_space: None, + error_code: Some(error_code), + error_code_value: None, + reason: None, + trigger_frame_type: None, + }); } - builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram); + builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog); if space_id == self.highest_space { // Don't send another close packet. Even with multipath we only send // CONNECTION_CLOSE on a single path since we expect our paths to work. @@ -1261,6 +1295,8 @@ impl Connection { builder .frame_space_mut() .write(frame::FrameType::PATH_RESPONSE); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::PathResponse { data: None }); builder.frame_space_mut().write(token); self.stats.frame_tx.path_response += 1; builder.finish_and_track( @@ -1272,6 +1308,7 @@ impl Connection { ..SentFrames::default() }, PadDatagram::ToMinMtu, + qlog, ); self.stats.udp_tx.on_sent(1, transmit.len()); return Some(Transmit { @@ -1295,6 +1332,7 @@ impl Connection { path_exclusive_only, &mut builder.frame_space_mut(), pn, + &mut qlog, ) }; @@ -1345,7 +1383,7 @@ impl Connection { { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. - builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No); + builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog); } else { // We need a new datagram for the next packet. Finish the current // packet with padding. @@ -1365,7 +1403,14 @@ impl Connection { "GSO truncated by demand for {} padding bytes", builder.buf.datagram_remaining_mut() - builder.predict_packet_end() ); - builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No); + builder.finish_and_track( + now, + self, + path_id, + sent_frames, + PadDatagram::No, + qlog, + ); break; } @@ -1377,9 +1422,10 @@ impl Connection { path_id, sent_frames, PadDatagram::ToSegmentSize, + qlog, ); } else { - builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram); + builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog); } if transmit.num_datagrams() == 1 { transmit.clip_datagram_size(); @@ -1401,7 +1447,7 @@ impl Connection { self.path_data(path_id).pto_count, &mut self.paths.get_mut(&path_id).unwrap().data, now, - self.orig_rem_cid, + self.initial_dst_cid, ); self.app_limited = transmit.is_empty() && !congestion_blocked; @@ -1422,6 +1468,7 @@ impl Connection { transmit.start_new_datagram_with_size(probe_size as usize); debug_assert_eq!(transmit.datagram_start_offset(), 0); + let mut qlog = QlogSentPacket::default(); let mut builder = PacketBuilder::new( now, space_id, @@ -1430,12 +1477,18 @@ impl Connection { &mut transmit, true, self, + &mut qlog, )?; // We implement MTU probes as ping packets padded up to the probe size trace!(?probe_size, "writing MTUD probe"); trace!("PING"); builder.frame_space_mut().write(frame::FrameType::PING); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::Ping { + length: None, + payload_length: None, + }); self.stats.frame_tx.ping += 1; // If supported by the peer, we want no delays to the probe's ACK @@ -1457,6 +1510,7 @@ impl Connection { path_id, sent_frames, PadDatagram::ToSize(probe_size), + qlog, ); self.path_stats @@ -1614,13 +1668,24 @@ impl Connection { // if a post-migration packet caused the CID to be retired, it's fair to pretend // this is sent first. debug_assert_eq!(buf.datagram_start_offset(), 0); - let mut builder = - PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, false, self)?; + let mut qlog = QlogSentPacket::default(); + let mut builder = PacketBuilder::new( + now, + SpaceId::Data, + path_id, + *prev_cid, + buf, + false, + self, + &mut qlog, + )?; trace!("validating previous path with PATH_CHALLENGE {:08x}", token); builder .frame_space_mut() .write(frame::FrameType::PATH_CHALLENGE); builder.frame_space_mut().write(token); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::PathChallenge { data: None }); self.stats.frame_tx.path_challenge += 1; // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame @@ -1629,7 +1694,7 @@ impl Connection { // sending a datagram of this size builder.pad_to(MIN_INITIAL_SIZE); - builder.finish(self, now); + builder.finish(self, now, qlog); self.stats.udp_tx.on_sent(1, buf.len()); Some(Transmit { @@ -1736,7 +1801,7 @@ impl Connection { path.data.pto_count, &mut path.data, now, - self.orig_rem_cid, + self.initial_dst_cid, ); } @@ -1843,7 +1908,7 @@ impl Connection { self.path_data(path_id).pto_count, &mut self.paths.get_mut(&path_id).unwrap().data, now, - self.orig_rem_cid, + self.initial_dst_cid, ); } PathTimer::PathValidation => { @@ -2734,7 +2799,7 @@ impl Connection { lost_send_time, pn_space, now, - self.orig_rem_cid, + self.initial_dst_cid, ); self.paths .get_mut(&path_id) @@ -3015,14 +3080,6 @@ impl Connection { // Update outgoing spin bit, inverting iff we're the client self.spin = self.side.is_client() ^ spin; } - - self.config.qlog_sink.emit_packet_received( - packet, - space_id, - !is_1rtt, - now, - self.orig_rem_cid, - ); } /// Resets the idle timeout timers @@ -3125,7 +3182,21 @@ impl Connection { false, ); - self.process_decrypted_packet(now, remote, path_id, Some(packet_number), packet.into())?; + let packet: Packet = packet.into(); + + let mut qlog = QlogRecvPacket::new(len); + #[cfg(feature = "qlog")] + qlog.header(&packet, Some(packet_number)); + + self.process_decrypted_packet( + now, + remote, + path_id, + Some(packet_number), + packet, + &mut qlog, + )?; + self.config.qlog_sink.emit_packet_received(self, qlog, now); if let Some(data) = remaining { self.handle_coalesced(now, remote, path_id, ecn, data); } @@ -3134,7 +3205,7 @@ impl Connection { self.path_data(path_id).pto_count, &mut self.paths.get_mut(&path_id).unwrap().data, now, - self.orig_rem_cid, + self.initial_dst_cid, ); Ok(()) @@ -3358,6 +3429,7 @@ impl Connection { ecn: Option, partial_decode: PartialDecode, ) { + let qlog = QlogRecvPacket::new(partial_decode.len()); if let Some(decoded) = packet_crypto::unprotect_header( partial_decode, &self.spaces, @@ -3371,6 +3443,7 @@ impl Connection { ecn, decoded.packet, decoded.stateless_reset, + qlog, ); } } @@ -3383,6 +3456,7 @@ impl Connection { ecn: Option, packet: Option, stateless_reset: bool, + mut qlog: QlogRecvPacket, ) { self.stats.udp_rx.ios += 1; if let Some(ref packet) = packet { @@ -3449,6 +3523,8 @@ impl Connection { } } Ok((packet, number)) => { + #[cfg(feature = "qlog")] + qlog.header(&packet, number); let span = match number { Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn), None => trace_span!("recv", space = ?packet.header.space()), @@ -3460,10 +3536,12 @@ impl Connection { .map(|pns| &mut pns.dedup); if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) { debug!("discarding possible duplicate packet"); + self.config.qlog_sink.emit_packet_received(self, qlog, now); return; } else if self.state.is_handshake() && packet.header.is_short() { // TODO: SHOULD buffer these to improve reordering tolerance. trace!("dropping short packet during handshake"); + self.config.qlog_sink.emit_packet_received(self, qlog, now); return; } else { if let Header::Initial(InitialHeader { ref token, .. }) = packet.header { @@ -3473,6 +3551,7 @@ impl Connection { // packets can be spoofed, so we discard rather than killing the // connection. warn!("discarding Initial with invalid retry token"); + self.config.qlog_sink.emit_packet_received(self, qlog, now); return; } } @@ -3501,7 +3580,11 @@ impl Connection { } } - self.process_decrypted_packet(now, remote, path_id, number, packet) + let res = self + .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog); + + self.config.qlog_sink.emit_packet_received(self, qlog, now); + res } } }; @@ -3568,6 +3651,7 @@ impl Connection { path_id: PathId, number: Option, packet: Packet, + qlog: &mut QlogRecvPacket, ) -> Result<(), ConnectionError> { if !self.paths.contains_key(&path_id) { // There is a chance this is a server side, first (for this path) packet, which would @@ -3580,10 +3664,10 @@ impl Connection { State::Established => { match packet.header.space() { SpaceId::Data => { - self.process_payload(now, remote, path_id, number.unwrap(), packet)? + self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)? } _ if packet.header.has_frames() => { - self.process_early_payload(now, path_id, packet)? + self.process_early_payload(now, path_id, packet, qlog)? } _ => { trace!("discarding unexpected pre-handshake packet"); @@ -3600,6 +3684,8 @@ impl Connection { continue; } }; + #[cfg(feature = "qlog")] + qlog.frame(&frame); if let Frame::Padding = frame { continue; @@ -3733,7 +3819,7 @@ impl Connection { } self.on_path_validated(path_id); - self.process_early_payload(now, path_id, packet)?; + self.process_early_payload(now, path_id, packet, qlog)?; if self.state.is_closed() { return Ok(()); } @@ -3827,7 +3913,7 @@ impl Connection { } let starting_space = self.highest_space; - self.process_early_payload(now, path_id, packet)?; + self.process_early_payload(now, path_id, packet, qlog)?; if self.side.is_server() && starting_space == SpaceId::Initial @@ -3851,7 +3937,7 @@ impl Connection { ty: LongType::ZeroRtt, .. } => { - self.process_payload(now, remote, path_id, number.unwrap(), packet)?; + self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?; Ok(()) } Header::VersionNegotiate { .. } => { @@ -3883,6 +3969,7 @@ impl Connection { now: Instant, path_id: PathId, packet: Packet, + #[allow(unused)] qlog: &mut QlogRecvPacket, ) -> Result<(), TransportError> { debug_assert_ne!(packet.header.space(), SpaceId::Data); debug_assert_eq!(path_id, PathId::ZERO); @@ -3890,6 +3977,8 @@ impl Connection { let mut ack_eliciting = false; for result in frame::Iter::new(packet.payload.freeze())? { let frame = result?; + #[cfg(feature = "qlog")] + qlog.frame(&frame); let span = match frame { Frame::Padding => continue, _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)), @@ -3949,6 +4038,7 @@ impl Connection { path_id: PathId, number: u64, packet: Packet, + #[allow(unused)] qlog: &mut QlogRecvPacket, ) -> Result<(), TransportError> { let payload = packet.payload.freeze(); let mut is_probing_packet = true; @@ -3960,6 +4050,8 @@ impl Connection { let mut migration_observed_addr = None; for result in frame::Iter::new(payload)? { let frame = result?; + #[cfg(feature = "qlog")] + qlog.frame(&frame); let span = match frame { Frame::Padding => continue, _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty), @@ -4800,6 +4892,7 @@ impl Connection { path_exclusive_only: bool, buf: &mut impl BufMut, pn: u64, + #[allow(unused)] qlog: &mut QlogSentPacket, ) -> SentFrames { let mut sent = SentFrames::default(); let is_multipath_negotiated = self.is_multipath_negotiated(); @@ -4815,6 +4908,8 @@ impl Connection { if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) { trace!("HANDSHAKE_DONE"); buf.write(frame::FrameType::HANDSHAKE_DONE); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::HandshakeDone); sent.retransmits.get_or_create().handshake_done = true; // This is just a u8 counter and the frame is typically just sent once self.stats.frame_tx.handshake_done = @@ -4866,6 +4961,8 @@ impl Connection { self.stats.frame_tx.observed_addr += 1; sent.retransmits.get_or_create().observed_addr = true; space.pending.observed_addr = false; + #[cfg(feature = "qlog")] + qlog.unknown_frame(&frame.get_type()); } } @@ -4875,6 +4972,11 @@ impl Connection { buf.write(frame::FrameType::PING); sent.non_retransmits = true; self.stats.frame_tx.ping += 1; + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::Ping { + length: None, + payload_length: None, + }); } // IMMEDIATE_ACK @@ -4883,6 +4985,8 @@ impl Connection { buf.write(frame::FrameType::IMMEDIATE_ACK); sent.non_retransmits = true; self.stats.frame_tx.immediate_ack += 1; + #[cfg(feature = "qlog")] + qlog.unknown_frame(&frame::FrameType::IMMEDIATE_ACK); } // ACK @@ -4908,6 +5012,7 @@ impl Connection { is_multipath_negotiated, buf, &mut self.stats, + qlog, ); } } @@ -4941,11 +5046,15 @@ impl Connection { self.ack_frequency .ack_frequency_sent(path_id, pn, max_ack_delay); self.stats.frame_tx.ack_frequency += 1; + #[cfg(feature = "qlog")] + qlog.unknown_frame(&frame::FrameType::ACK_FREQUENCY); } // PATH_CHALLENGE if buf.remaining_mut() > 9 && space_id == SpaceId::Data && path.send_new_challenge { path.send_new_challenge = false; + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::PathChallenge { data: None }); // Generate a new challenge every time we send a new PATH_CHALLENGE let token = self.rng.random(); @@ -4978,6 +5087,8 @@ impl Connection { let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no); if buf.remaining_mut() > frame.size() { frame.write(buf); + #[cfg(feature = "qlog")] + qlog.unknown_frame(&frame.get_type()); self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8); @@ -4998,6 +5109,8 @@ impl Connection { trace!("PATH_RESPONSE {:08x}", token); buf.write(frame::FrameType::PATH_RESPONSE); buf.write(token); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::PathResponse { data: None }); self.stats.frame_tx.path_response += 1; // NOTE: this is technically not required but might be useful to ride the @@ -5013,6 +5126,8 @@ impl Connection { frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no); if buf.remaining_mut() > frame.size() { frame.write(buf); + #[cfg(feature = "qlog")] + qlog.unknown_frame(&frame.get_type()); self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8); @@ -5060,6 +5175,11 @@ impl Connection { ); truncated.encode(buf); self.stats.frame_tx.crypto += 1; + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::Crypto { + offset: truncated.offset, + length: truncated.data.len() as u64, + }); sent.retransmits.get_or_create().crypto.push_back(truncated); if !frame.data.is_empty() { frame.offset += len as u64; @@ -5081,6 +5201,8 @@ impl Connection { error_code, } .encode(buf); + #[cfg(feature = "qlog")] + qlog.unknown_frame(&FrameType::PATH_ABANDON); self.stats.frame_tx.path_abandon += 1; trace!(?path_id, "PATH_ABANDON"); sent.retransmits @@ -5112,6 +5234,8 @@ impl Connection { status_seq_no: seq, } .encode(buf); + #[cfg(feature = "qlog")] + qlog.unknown_frame(&FrameType::PATH_AVAILABLE); self.stats.frame_tx.path_available += 1; trace!(?path_id, %seq, "PATH_AVAILABLE") } @@ -5121,6 +5245,8 @@ impl Connection { status_seq_no: seq, } .encode(buf); + #[cfg(feature = "qlog")] + qlog.unknown_frame(&FrameType::PATH_BACKUP); self.stats.frame_tx.path_backup += 1; trace!(?path_id, %seq, "PATH_BACKUP") } @@ -5133,6 +5259,8 @@ impl Connection { && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut() { frame::MaxPathId(self.local_max_path_id).encode(buf); + #[cfg(feature = "qlog")] + qlog.unknown_frame(&FrameType::MAX_PATH_ID); space.pending.max_path_id = false; sent.retransmits.get_or_create().max_path_id = true; trace!(val = %self.local_max_path_id, "MAX_PATH_ID"); @@ -5145,6 +5273,8 @@ impl Connection { && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut() { frame::PathsBlocked(self.remote_max_path_id).encode(buf); + #[cfg(feature = "qlog")] + qlog.unknown_frame(&FrameType::PATHS_BLOCKED); space.pending.paths_blocked = false; sent.retransmits.get_or_create().paths_blocked = true; trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED"); @@ -5166,6 +5296,8 @@ impl Connection { next_seq: VarInt(next_seq), } .encode(buf); + #[cfg(feature = "qlog")] + qlog.unknown_frame(&FrameType::PATH_CIDS_BLOCKED); sent.retransmits .get_or_create() .path_cids_blocked @@ -5181,6 +5313,7 @@ impl Connection { &mut space.pending, &mut sent.retransmits, &mut self.stats.frame_tx, + qlog, ); } @@ -5237,6 +5370,14 @@ impl Connection { } .encode(buf); sent.retransmits.get_or_create().new_cids.push(issued); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::NewConnectionId { + sequence_number: issued.sequence as u32, + retire_prior_to: retire_prior_to as u32, + connection_id_length: Some(issued.id.len() as u8), + connection_id: format!("{}", issued.id), + stateless_reset_token: Some(format!("{}", issued.reset_token)), + }); } // RETIRE_CONNECTION_ID @@ -5256,6 +5397,10 @@ impl Connection { None => break, }; frame::RetireConnectionId { path_id, sequence }.encode(buf); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::RetireConnectionId { + sequence_number: sequence as u32, + }); sent.retransmits .get_or_create() .retire_cids @@ -5268,11 +5413,18 @@ impl Connection { && buf.remaining_mut() > Datagram::SIZE_BOUND && space_id == SpaceId::Data { + #[cfg(feature = "qlog")] + let prev_remaining = buf.remaining_mut(); match self.datagrams.write(buf) { true => { sent_datagrams = true; sent.non_retransmits = true; self.stats.frame_tx.datagram += 1; + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::Datagram { + length: (prev_remaining - buf.remaining_mut()) as u64, + raw: None, + }); } false => break, } @@ -5320,6 +5472,22 @@ impl Connection { trace!("NEW_TOKEN"); new_token.encode(buf); + #[cfg(feature = "qlog")] + { + use ::qlog; + qlog.frame(QuicFrame::NewToken { + token: qlog::Token { + // TODO: pick the token type some how + ty: Some(qlog::TokenType::Retry), + raw: Some(qlog::events::RawInfo { + data: qlog::HexSlice::maybe_string(Some(&new_token.token)), + length: Some(new_token.token.len() as u64), + payload_length: None, + }), + details: None, + }, + }); + } sent.retransmits .get_or_create() .new_tokens @@ -5329,9 +5497,9 @@ impl Connection { // STREAM if !path_exclusive_only && space_id == SpaceId::Data { - sent.stream_frames = self - .streams - .write_stream_frames(buf, self.config.send_fairness); + sent.stream_frames = + self.streams + .write_stream_frames(buf, self.config.send_fairness, qlog); self.stats.frame_tx.stream += sent.stream_frames.len() as u64; } @@ -5385,6 +5553,7 @@ impl Connection { send_path_acks: bool, buf: &mut impl BufMut, stats: &mut ConnectionStats, + #[allow(unused)] qlog: &mut QlogSentPacket, ) { // 0-RTT packets must never carry acks (which would have to be of handshake packets) debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT"); @@ -5410,12 +5579,31 @@ impl Connection { if !ranges.is_empty() { trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us"); frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf); + #[cfg(feature = "qlog")] + qlog.unknown_frame(&FrameType::PATH_ACK); stats.frame_tx.path_acks += 1; } } else { trace!("ACK {ranges:?}, Delay = {delay_micros}us"); frame::Ack::encode(delay as _, ranges, ecn, buf); stats.frame_tx.acks += 1; + + // TODO: ack ranges + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::Ack { + ack_delay: Some(delay as f32), + acked_ranges: Some(AckedRanges::Double( + ranges + .iter() + .map(|range| (range.start, range.end)) + .collect(), + )), + ect1: ecn.map(|e| e.ect1), + ect0: ecn.map(|e| e.ect0), + ce: ecn.map(|e| e.ce), + length: None, + payload_length: None, + }); } } diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index 10c5ac808..814f84507 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -1,11 +1,13 @@ use bytes::{BufMut, Bytes}; +#[cfg(feature = "qlog")] +use qlog::events::quic::QuicFrame; use rand::Rng; use tracing::{debug, trace, trace_span}; use super::{Connection, PathId, SentFrames, TransmitBuf, spaces::SentPacket}; use crate::{ ConnectionId, Instant, MIN_INITIAL_SIZE, TransportError, TransportErrorCode, - connection::ConnectionSide, + connection::{ConnectionSide, qlog::QlogSentPacket}, frame::{self, Close}, packet::{FIXED_BIT, Header, InitialHeader, LongType, PacketNumber, PartialEncode, SpaceId}, }; @@ -46,6 +48,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { buffer: &'a mut TransmitBuf<'b>, ack_eliciting: bool, conn: &mut Connection, + #[allow(unused)] qlog: &mut QlogSentPacket, ) -> Option where 'b: 'a, @@ -130,6 +133,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { version, }), }; + let partial_encode = header.encode(buffer); if conn.peer_params.grease_quic_bit && conn.rng.random() { buffer.as_mut_slice()[partial_encode.start] ^= FIXED_BIT; @@ -162,6 +166,14 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { let max_size = buffer.datagram_max_offset() - tag_len; debug_assert!(max_size >= min_size); + #[cfg(feature = "qlog")] + qlog.header( + &header, + Some(exact_number), + space_id, + space_id == SpaceId::Data && conn.spaces[SpaceId::Data].crypto.is_none(), + ); + Some(Self { buf: buffer, space: space_id, @@ -204,6 +216,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { path_id: PathId, sent: SentFrames, pad_datagram: PadDatagram, + qlog: QlogSentPacket, ) { match pad_datagram { PadDatagram::No => (), @@ -214,7 +227,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { let ack_eliciting = self.ack_eliciting; let exact_number = self.exact_number; let space_id = self.space; - let (size, padded) = self.finish(conn, now); + let (size, padded) = self.finish(conn, now, qlog); let size = match padded || ack_eliciting { true => size as u16, @@ -254,15 +267,26 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { } /// Encrypt packet, returning the length of the packet and whether padding was added - pub(super) fn finish(self, conn: &mut Connection, now: Instant) -> (usize, bool) { + pub(super) fn finish( + self, + conn: &mut Connection, + now: Instant, + #[allow(unused_mut)] mut qlog: QlogSentPacket, + ) -> (usize, bool) { debug_assert!( self.buf.len() <= self.buf.datagram_max_offset() - self.tag_len, "packet exceeds maximum size" ); let pad = self.buf.len() < self.min_size; if pad { - trace!("PADDING * {}", self.min_size - self.buf.len()); - self.buf.put_bytes(0, self.min_size - self.buf.len()); + let padding = self.min_size - self.buf.len(); + trace!("PADDING * {}", padding); + self.buf.put_bytes(0, padding); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::Padding { + length: Some(padding as u32), + payload_length: padding as u32, + }); } let space = &conn.spaces[self.space]; @@ -293,14 +317,9 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { let packet_len = self.buf.len() - encode_start; trace!(size = %packet_len, short_header = %self.short_header, "wrote packet"); - conn.config.qlog_sink.emit_packet_sent( - self.exact_number, - packet_len, - self.space, - self.space == SpaceId::Data && conn.spaces[SpaceId::Data].crypto.is_none(), - now, - conn.orig_rem_cid, - ); + #[cfg(feature = "qlog")] + qlog.finalize(packet_len); + conn.config.qlog_sink.emit_packet_sent(conn, qlog, now); (packet_len, pad) } diff --git a/quinn-proto/src/connection/qlog.rs b/quinn-proto/src/connection/qlog.rs index a324746f1..e5c431a54 100644 --- a/quinn-proto/src/connection/qlog.rs +++ b/quinn-proto/src/connection/qlog.rs @@ -1,27 +1,33 @@ // Function bodies in this module are regularly cfg'd out #![allow(unused_variables)] -#[cfg(feature = "qlog")] -use std::sync::{Arc, Mutex}; - #[cfg(feature = "qlog")] use qlog::{ events::{ - Event, EventData, + Event, EventData, RawInfo, quic::{ PacketHeader, PacketLost, PacketLostTrigger, PacketReceived, PacketSent, PacketType, + QuicFrame, StreamType, }, }, streamer::QlogStreamer, }; +use std::net::{IpAddr, SocketAddr}; +#[cfg(feature = "qlog")] +use std::sync::{Arc, Mutex}; #[cfg(feature = "qlog")] use tracing::warn; use crate::{ - ConnectionId, Instant, + Connection, ConnectionId, Instant, connection::{PathData, SentPacket}, packet::SpaceId, }; +#[cfg(feature = "qlog")] +use crate::{ + FrameType, + packet::{Header, Packet}, +}; /// Shareable handle to a single qlog output stream #[cfg(feature = "qlog")] @@ -30,10 +36,10 @@ pub struct QlogStream(pub(crate) Arc>); #[cfg(feature = "qlog")] impl QlogStream { - fn emit_event(&self, orig_rem_cid: ConnectionId, event: EventData, now: Instant) { + fn emit_event(&self, initial_dst_cid: ConnectionId, event: EventData, now: Instant) { // Time will be overwritten by `add_event_with_instant` let mut event = Event::with_time(0.0, event); - event.group_id = Some(orig_rem_cid.to_string()); + event.group_id = Some(initial_dst_cid.to_string()); let mut qlog_streamer = self.0.lock().unwrap(); if let Err(e) = qlog_streamer.add_event_with_instant(event, now) { @@ -61,12 +67,49 @@ impl QlogSink { } } + pub(super) fn emit_connection_started( + &self, + now: Instant, + loc_cid: ConnectionId, + rem_cid: ConnectionId, + remote: SocketAddr, + local_ip: Option, + initial_dst_cid: ConnectionId, + ) { + #[cfg(feature = "qlog")] + { + use qlog::events::connectivity::ConnectionStarted; + + let Some(stream) = self.stream.as_ref() else { + return; + }; + // TODO: Review fields. The standard has changed since. + stream.emit_event( + initial_dst_cid, + EventData::ConnectionStarted(ConnectionStarted { + ip_version: Some(String::from(match remote.ip() { + IpAddr::V4(_) => "v4", + IpAddr::V6(_) => "v6", + })), + src_ip: local_ip.map(|addr| addr.to_string()).unwrap_or_default(), + dst_ip: remote.ip().to_string(), + protocol: None, + src_port: None, + dst_port: Some(remote.port()), + src_cid: Some(loc_cid.to_string()), + dst_cid: Some(rem_cid.to_string()), + }), + now, + ); + } + } + pub(super) fn emit_recovery_metrics( &self, pto_count: u32, path: &mut PathData, now: Instant, - orig_rem_cid: ConnectionId, + initial_dst_cid: ConnectionId, ) { #[cfg(feature = "qlog")] { @@ -78,7 +121,7 @@ impl QlogSink { return; }; - stream.emit_event(orig_rem_cid, EventData::MetricsUpdated(metrics), now); + stream.emit_event(initial_dst_cid, EventData::MetricsUpdated(metrics), now); } } @@ -89,7 +132,7 @@ impl QlogSink { lost_send_time: Instant, space: SpaceId, now: Instant, - orig_rem_cid: ConnectionId, + initial_dst_cid: ConnectionId, ) { #[cfg(feature = "qlog")] { @@ -111,63 +154,285 @@ impl QlogSink { }), }; - stream.emit_event(orig_rem_cid, EventData::PacketLost(event), now); + stream.emit_event(initial_dst_cid, EventData::PacketLost(event), now); } } - pub(super) fn emit_packet_sent( - &self, - pn: u64, - len: usize, - space: SpaceId, - is_0rtt: bool, - now: Instant, - orig_rem_cid: ConnectionId, - ) { + pub(super) fn emit_packet_sent(&self, conn: &Connection, packet: QlogSentPacket, now: Instant) { #[cfg(feature = "qlog")] { let Some(stream) = self.stream.as_ref() else { return; }; - - let event = PacketSent { - header: PacketHeader { - packet_number: Some(pn), - packet_type: packet_type(space, is_0rtt), - length: Some(len as u16), - ..Default::default() - }, - ..Default::default() - }; - - stream.emit_event(orig_rem_cid, EventData::PacketSent(event), now); + stream.emit_event( + conn.initial_dst_cid, + EventData::PacketSent(packet.inner), + now, + ); } } pub(super) fn emit_packet_received( &self, - pn: u64, - space: SpaceId, - is_0rtt: bool, + conn: &Connection, + packet: QlogRecvPacket, now: Instant, - orig_rem_cid: ConnectionId, ) { #[cfg(feature = "qlog")] { let Some(stream) = self.stream.as_ref() else { return; }; + let mut packet = packet; + packet.emit_padding(); + let event = packet.inner; + stream.emit_event(conn.initial_dst_cid, EventData::PacketReceived(event), now); + } + } +} - let event = PacketReceived { - header: PacketHeader { - packet_number: Some(pn), - packet_type: packet_type(space, is_0rtt), - ..Default::default() - }, - ..Default::default() - }; +#[derive(Default)] +pub(crate) struct QlogSentPacket { + #[cfg(feature = "qlog")] + inner: PacketSent, +} - stream.emit_event(orig_rem_cid, EventData::PacketReceived(event), now); +#[cfg(feature = "qlog")] +impl QlogSentPacket { + pub(crate) fn header( + &mut self, + header: &Header, + pn: Option, + space: SpaceId, + is_0rtt: bool, + ) { + self.inner.header.scid = header.src_cid().map(encode_cid); + self.inner.header.dcid = Some(encode_cid(header.dst_cid())); + self.inner.header.packet_number = pn; + self.inner.header.packet_type = packet_type(space, is_0rtt); + } + + pub(crate) fn frame(&mut self, frame: QuicFrame) { + self.inner.frames.get_or_insert_default().push(frame); + } + + pub(crate) fn unknown_frame(&mut self, frame: &FrameType) { + let ty = frame.to_u64(); + self.frame(unknown_frame(frame)) + } + + pub(super) fn finalize(&mut self, len: usize) { + self.inner.header.length = Some(len as u16); + } +} + +pub(crate) struct QlogRecvPacket { + #[cfg(feature = "qlog")] + inner: PacketReceived, + #[cfg(feature = "qlog")] + padding: usize, +} + +#[cfg(not(feature = "qlog"))] +impl QlogRecvPacket { + pub(crate) fn new(_len: usize) -> Self { + Self {} + } +} + +#[cfg(feature = "qlog")] +impl QlogRecvPacket { + pub(crate) fn new(len: usize) -> Self { + let mut this = Self { + inner: Default::default(), + padding: 0, + }; + this.inner.header.length = Some(len as u16); + this + } + + pub(crate) fn header(&mut self, packet: &Packet, pn: Option) { + let header = &packet.header; + let is_0rtt = !packet.header.is_1rtt(); + self.inner.header.scid = header.src_cid().map(encode_cid); + self.inner.header.dcid = Some(encode_cid(header.dst_cid())); + self.inner.header.packet_number = pn; + self.inner.header.packet_type = packet_type(header.space(), is_0rtt); + } + + fn emit_padding(&mut self) { + if self.padding > 0 { + self.inner + .frames + .get_or_insert_default() + .push(QuicFrame::Padding { + length: Some(self.padding as u32), + payload_length: self.padding as u32, + }); + self.padding = 0; + } + } + + pub(crate) fn frame(&mut self, frame: &crate::Frame) { + if matches!(frame, crate::Frame::Padding) { + self.padding += 1; + } else { + self.emit_padding(); + self.inner + .frames + .get_or_insert_default() + .push(frame.to_qlog()) + } + } +} + +#[cfg(feature = "qlog")] +fn unknown_frame(frame: &FrameType) -> QuicFrame { + let ty = frame.to_u64(); + QuicFrame::Unknown { + raw_frame_type: ty, + frame_type_value: Some(ty), + raw: Some(RawInfo { + length: None, + payload_length: None, + data: Some(format!("{frame}")), + }), + } +} + +#[cfg(feature = "qlog")] +impl crate::Frame { + fn to_qlog(&self) -> QuicFrame { + use qlog::events::quic::AckedRanges; + + match self { + Self::Padding => QuicFrame::Padding { + length: None, + payload_length: 1, + }, + Self::Ping => QuicFrame::Ping { + length: None, + payload_length: None, + }, + Self::Ack(ack) => QuicFrame::Ack { + ack_delay: Some(ack.delay as f32), + acked_ranges: Some(AckedRanges::Double( + ack.iter() + .map(|range| (*range.start(), *range.end())) + .collect(), + )), + ect1: ack.ecn.as_ref().map(|e| e.ect1), + ect0: ack.ecn.as_ref().map(|e| e.ect0), + ce: ack.ecn.as_ref().map(|e| e.ce), + length: None, + payload_length: None, + }, + Self::ResetStream(f) => QuicFrame::ResetStream { + stream_id: f.id.into(), + error_code: f.error_code.into(), + final_size: f.final_offset.into(), + length: None, + payload_length: None, + }, + Self::StopSending(f) => QuicFrame::StopSending { + stream_id: f.id.into(), + error_code: f.error_code.into(), + length: None, + payload_length: None, + }, + Self::Crypto(c) => QuicFrame::Crypto { + offset: c.offset, + length: c.data.len() as u64, + }, + Self::NewToken(t) => { + use ::qlog; + QuicFrame::NewToken { + token: qlog::Token { + ty: Some(::qlog::TokenType::Retry), + raw: Some(qlog::events::RawInfo { + data: qlog::HexSlice::maybe_string(Some(&t.token)), + length: Some(t.token.len() as u64), + payload_length: None, + }), + details: None, + }, + } + } + Self::Stream(s) => QuicFrame::Stream { + stream_id: s.id.into(), + offset: s.offset, + length: s.data.len() as u64, + fin: Some(s.fin), + raw: None, + }, + Self::MaxData(v) => QuicFrame::MaxData { + maximum: (*v).into(), + }, + Self::MaxStreamData { id, offset } => QuicFrame::MaxStreamData { + stream_id: (*id).into(), + maximum: *offset, + }, + Self::MaxStreams { dir, count } => QuicFrame::MaxStreams { + maximum: *count, + stream_type: (*dir).into(), + }, + Self::DataBlocked { offset } => QuicFrame::DataBlocked { limit: *offset }, + Self::StreamDataBlocked { id, offset } => QuicFrame::StreamDataBlocked { + stream_id: (*id).into(), + limit: *offset, + }, + Self::StreamsBlocked { dir, limit } => QuicFrame::StreamsBlocked { + stream_type: (*dir).into(), + limit: *limit, + }, + Self::NewConnectionId(f) => QuicFrame::NewConnectionId { + sequence_number: f.sequence as u32, + retire_prior_to: f.retire_prior_to as u32, + connection_id_length: Some(f.id.len() as u8), + connection_id: format!("{}", f.id), + stateless_reset_token: Some(format!("{}", f.reset_token)), + }, + Self::RetireConnectionId(f) => QuicFrame::RetireConnectionId { + sequence_number: f.sequence as u32, + }, + Self::PathChallenge(_) => QuicFrame::PathChallenge { data: None }, + Self::PathResponse(_) => QuicFrame::PathResponse { data: None }, + Self::Close(close) => QuicFrame::ConnectionClose { + error_space: None, + error_code: Some(close.error_code()), + error_code_value: None, + reason: None, + trigger_frame_type: None, + }, + Self::Datagram(d) => QuicFrame::Datagram { + length: d.data.len() as u64, + raw: None, + }, + Self::HandshakeDone => QuicFrame::HandshakeDone, + // Extensions and unsupported frames → Unknown + Self::AckFrequency(_) + | Self::ImmediateAck + | Self::ObservedAddr(_) + | Self::PathAck(_) + | Self::PathAbandon(_) + | Self::PathAvailable(_) + | Self::PathBackup(_) + | Self::MaxPathId(_) + | Self::PathsBlocked(_) + | Self::PathCidsBlocked(_) + | Self::AddAddress(_) + | Self::ReachOut(_) + | Self::RemoveAddress(_) => unknown_frame(&self.ty()), + } + } +} + +#[cfg(feature = "qlog")] +impl From for StreamType { + fn from(value: crate::Dir) -> Self { + match value { + crate::Dir::Bi => Self::Bidirectional, + crate::Dir::Uni => Self::Unidirectional, } } } @@ -188,3 +453,8 @@ fn packet_type(space: SpaceId, is_0rtt: bool) -> PacketType { SpaceId::Data => PacketType::OneRtt, } } + +#[cfg(feature = "qlog")] +fn encode_cid(cid: ConnectionId) -> String { + format!("{cid}") +} diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index 7837b0707..018ddb7fe 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -5,6 +5,8 @@ use std::{ }; use bytes::BufMut; +#[cfg(feature = "qlog")] +use qlog::events::quic::QuicFrame; use rustc_hash::FxHashMap; use tracing::{debug, trace}; @@ -15,7 +17,7 @@ use super::{ use crate::{ Dir, MAX_STREAM_COUNT, Side, StreamId, TransportError, VarInt, coding::BufMutExt, - connection::stats::FrameStats, + connection::{qlog::QlogSentPacket, stats::FrameStats}, frame::{self, FrameStruct, StreamMetaVec}, transport_parameters::TransportParameters, }; @@ -415,6 +417,7 @@ impl StreamsState { pending: &mut Retransmits, retransmits: &mut ThinRetransmits, stats: &mut FrameStats, + #[allow(unused)] qlog: &mut QlogSentPacket, ) { // RESET_STREAM while buf.remaining_mut() > frame::ResetStream::SIZE_BOUND { @@ -437,6 +440,14 @@ impl StreamsState { final_offset: VarInt::try_from(stream.offset()).expect("impossibly large offset"), } .encode(buf); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::ResetStream { + stream_id: id.into(), + error_code: error_code.into(), + final_size: stream.offset(), + length: None, + payload_length: None, + }); stats.reset_stream += 1; } @@ -455,6 +466,13 @@ impl StreamsState { // can't be relied upon regardless. trace!(stream = %frame.id, "STOP_SENDING"); frame.encode(buf); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::StopSending { + stream_id: frame.id.into(), + error_code: frame.error_code.into(), + length: None, + payload_length: None, + }); retransmits.get_or_create().stop_sending.push(frame); stats.stop_sending += 1; } @@ -479,6 +497,10 @@ impl StreamsState { retransmits.get_or_create().max_data = true; buf.write(frame::FrameType::MAX_DATA); buf.write(max); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::MaxData { + maximum: max.into(), + }); stats.max_data += 1; } @@ -510,6 +532,11 @@ impl StreamsState { buf.write(frame::FrameType::MAX_STREAM_DATA); buf.write(id); buf.write_var(max); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::MaxStreamData { + stream_id: id.into(), + maximum: max, + }); stats.max_stream_data += 1; } @@ -531,6 +558,14 @@ impl StreamsState { Dir::Bi => frame::FrameType::MAX_STREAMS_BIDI, }); buf.write_var(self.max_remote[dir as usize]); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::MaxStreams { + maximum: self.max_remote[dir as usize], + stream_type: match dir { + Dir::Bi => qlog::events::quic::StreamType::Bidirectional, + Dir::Uni => qlog::events::quic::StreamType::Unidirectional, + }, + }); match dir { Dir::Uni => stats.max_streams_uni += 1, Dir::Bi => stats.max_streams_bidi += 1, @@ -542,6 +577,7 @@ impl StreamsState { &mut self, buf: &mut impl BufMut, fair: bool, + #[allow(unused)] qlog: &mut QlogSentPacket, ) -> StreamMetaVec { let mut stream_frames = StreamMetaVec::new(); while buf.remaining_mut() > frame::Stream::SIZE_BOUND { @@ -592,6 +628,14 @@ impl StreamsState { let meta = frame::StreamMeta { id, offsets, fin }; trace!(id = %meta.id, off = meta.offsets.start, len = meta.offsets.end - meta.offsets.start, fin = meta.fin, "STREAM"); meta.encode(encode_length, buf); + #[cfg(feature = "qlog")] + qlog.frame(QuicFrame::Stream { + stream_id: meta.id.into(), + offset: meta.offsets.start, + length: meta.offsets.end - meta.offsets.start, + fin: Some(meta.fin), + raw: None, + }); // The range might not be retrievable in a single `get` if it is // stored in noncontiguous fashion. Therefore this loop iterates @@ -1378,7 +1422,7 @@ mod tests { high.write(b"high").unwrap(); let mut buf = Vec::with_capacity(40); - let meta = server.write_stream_frames(&mut buf, true); + let meta = server.write_stream_frames(&mut buf, true, &mut Default::default()); assert_eq!(meta[0].id, id_high); assert_eq!(meta[1].id, id_mid); assert_eq!(meta[2].id, id_low); @@ -1437,7 +1481,7 @@ mod tests { high.set_priority(-1).unwrap(); let mut buf = Vec::with_capacity(1000).limit(40); - let meta = server.write_stream_frames(&mut buf, true); + let meta = server.write_stream_frames(&mut buf, true, &mut Default::default()); assert_eq!(meta.len(), 1); assert_eq!(meta[0].id, id_high); @@ -1447,7 +1491,7 @@ mod tests { let mut buf = buf.into_inner(); // Send the remaining data. The initial mid priority one should go first now - let meta = server.write_stream_frames(&mut buf, true); + let meta = server.write_stream_frames(&mut buf, true, &mut Default::default()); assert_eq!(meta.len(), 2); assert_eq!(meta[0].id, id_mid); assert_eq!(meta[1].id, id_high); @@ -1508,7 +1552,8 @@ mod tests { // loop until all the streams are written loop { let mut chunk_buf = buf.limit(40); - let meta = server.write_stream_frames(&mut chunk_buf, fair); + let meta = + server.write_stream_frames(&mut chunk_buf, fair, &mut Default::default()); if meta.is_empty() { break; } @@ -1580,7 +1625,7 @@ mod tests { // Write the first chunk of stream_a let mut chunk_buf = buf.limit(40); - let meta = server.write_stream_frames(&mut chunk_buf, false); + let meta = server.write_stream_frames(&mut chunk_buf, false, &mut Default::default()); let mut buf = chunk_buf.into_inner(); assert!(!meta.is_empty()); metas.extend(meta); @@ -1598,7 +1643,7 @@ mod tests { // loop until all the streams are written loop { let mut chunk_buf = buf.limit(40); - let meta = server.write_stream_frames(&mut chunk_buf, false); + let meta = server.write_stream_frames(&mut chunk_buf, false, &mut Default::default()); buf = chunk_buf.into_inner(); if meta.is_empty() { break; diff --git a/quinn-proto/src/frame.rs b/quinn-proto/src/frame.rs index 54f04cd48..9cfd95063 100644 --- a/quinn-proto/src/frame.rs +++ b/quinn-proto/src/frame.rs @@ -39,6 +39,11 @@ impl FrameType { None } } + + #[cfg(feature = "qlog")] + pub(crate) fn to_u64(self) -> u64 { + self.0 + } } impl coding::Codec for FrameType { @@ -338,6 +343,13 @@ impl Close { pub(crate) fn is_transport_layer(&self) -> bool { matches!(*self, Self::Connection(_)) } + + pub(crate) fn error_code(&self) -> u64 { + match self { + Self::Connection(frame) => frame.error_code.into(), + Self::Application(frame) => frame.error_code.into(), + } + } } impl From for Close { diff --git a/quinn-proto/src/lib.rs b/quinn-proto/src/lib.rs index 5e5cd018e..b9dd318b9 100644 --- a/quinn-proto/src/lib.rs +++ b/quinn-proto/src/lib.rs @@ -57,12 +57,12 @@ pub use rustls; mod config; #[cfg(doc)] pub use config::DEFAULT_CONCURRENT_MULTIPATH_PATHS_WHEN_ENABLED; -#[cfg(feature = "qlog")] -pub use config::{QlogConfig, VantagePointType}; pub use config::{ AckFrequencyConfig, ClientConfig, ConfigError, EndpointConfig, IdleTimeout, MtuDiscoveryConfig, ServerConfig, StdSystemTime, TimeSource, TransportConfig, ValidationTokenConfig, }; +#[cfg(feature = "qlog")] +pub use config::{QlogConfig, VantagePointType}; pub mod crypto; diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index 28b925742..f2871cc07 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -475,6 +475,17 @@ impl Header { VersionNegotiate { .. } => false, } } + + #[cfg(feature = "qlog")] + pub(crate) fn src_cid(&self) -> Option { + match self { + Self::Initial(initial_header) => Some(initial_header.src_cid), + Self::Long { src_cid, .. } => Some(*src_cid), + Self::Retry { src_cid, .. } => Some(*src_cid), + Self::Short { .. } => None, + Self::VersionNegotiate { src_cid, .. } => Some(*src_cid), + } + } } pub(crate) struct PartialEncode {