Skip to content

Commit

Permalink
Update qlog events
Browse files Browse the repository at this point in the history
- add recovery_metrics_updated event
- add recovery_packet_lost event
- update quic_packet_received event
- update quic_packet_sent event
- add tquic_qvis.sh to covert qlog files to be compatible with qvis
  • Loading branch information
iyangsj committed Jan 23, 2024
1 parent e9a0f7a commit f27e906
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 37 deletions.
4 changes: 2 additions & 2 deletions src/congestion_control/cubic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ mod tests {
has_data: false,
sent_size: pkt_size as usize,
rate_sample_state: Default::default(),
reinjected: false,
..SentPacket::default()
});
}

Expand Down Expand Up @@ -644,7 +644,7 @@ mod tests {
has_data: false,
sent_size: pkt_size as usize,
rate_sample_state: Default::default(),
reinjected: false,
..SentPacket::default()
});
}

Expand Down
9 changes: 4 additions & 5 deletions src/congestion_control/delivery_rate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ mod tests {
has_data: false,
sent_size: 240,
rate_sample_state: Default::default(),
reinjected: false,
..SentPacket::default()
};

rate_estimator.on_packet_sent(&mut pkt_n1, bytes_in_flight, bytes_lost);
Expand All @@ -308,7 +308,7 @@ mod tests {
has_data: false,
sent_size: 240,
rate_sample_state: Default::default(),
reinjected: false,
..SentPacket::default()
};

bytes_in_flight += pkt_n1.sent_size as u64;
Expand Down Expand Up @@ -361,7 +361,6 @@ mod tests {
let now = Instant::now();
let mut pkts_part1: Vec<SentPacket> = Vec::new();
let mut pkts_part2: Vec<SentPacket> = Vec::new();
// let mut pkts_part3: Vec<SentPacket> = Vec::new();
let bytes_lost = 0;
let mut bytes_in_flight = 0;
let pkt_size: u64 = 240;
Expand All @@ -380,7 +379,7 @@ mod tests {
has_data: false,
sent_size: pkt_size as usize,
rate_sample_state: Default::default(),
reinjected: false,
..SentPacket::default()
});
}

Expand All @@ -397,7 +396,7 @@ mod tests {
has_data: false,
sent_size: pkt_size as usize,
rate_sample_state: Default::default(),
reinjected: false,
..SentPacket::default()
});
}

Expand Down
64 changes: 54 additions & 10 deletions src/connection/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ impl Connection {
// Process each QUIC frame in the QUIC packet
let mut ack_eliciting_pkt = false;
let mut probing_pkt = true;
let mut qframes = vec![];

while !payload.is_empty() {
let (frame, len) = Frame::from_bytes(&mut payload, hdr.pkt_type)?;
Expand All @@ -592,14 +593,23 @@ impl Connection {
if !frame.probing() {
probing_pkt = false;
}
if self.qlog.is_some() {
qframes.push(frame.to_qlog());
}

self.recv_frame(frame, &hdr, pid, space_id, info.time)?;
let _ = payload.split_to(len);
}

// Write TransportPacketReceived event to qlog.
// Write events to qlog.
if let Some(qlog) = &mut self.qlog {
Self::qlog_quic_packet_received(qlog, &hdr, pkt_num, read, payload_len);
// Write TransportPacketReceived event to qlog.
Self::qlog_quic_packet_received(qlog, &hdr, pkt_num, read, payload_len, qframes);

// Write RecoveryMetricsUpdate event to qlog.
if let Ok(path) = self.paths.get_mut(pid) {
path.recovery.qlog_recovery_metrics_updated(qlog);
}
}

// Process acknowledged frames.
Expand Down Expand Up @@ -715,6 +725,7 @@ impl Connection {
space_id,
&mut self.spaces,
handshake_status,
self.qlog.as_mut(),
now,
)?;
self.stats.lost_count += lost_pkts;
Expand Down Expand Up @@ -1557,6 +1568,7 @@ impl Connection {
)?;

let sent_pkt = space::SentPacket {
pkt_type,
pkt_num,
time_sent: now,
time_acked: None,
Expand All @@ -1577,9 +1589,19 @@ impl Connection {
self.paths.get(path_id)?
);

// Write TransportPacketSent event to qlog.
// Write events to qlog.
if let Some(qlog) = &mut self.qlog {
Self::qlog_quic_packet_sent(qlog, &hdr, pkt_num, written, payload_len);
// Write TransportPacketSent event to qlog.
let mut qframes = Vec::with_capacity(sent_pkt.frames.len());
for frame in &sent_pkt.frames {
qframes.push(frame.to_qlog());
}
Self::qlog_quic_packet_sent(qlog, &hdr, pkt_num, written, payload_len, qframes);

// Write RecoveryMetricsUpdate event to qlog.
if let Ok(path) = self.paths.get_mut(path_id) {
path.recovery.qlog_recovery_metrics_updated(qlog);
}
}

// Notify the packet sent event to the multipath scheduler
Expand Down Expand Up @@ -2862,8 +2884,14 @@ impl Connection {
SpaceId::Data, // TODO: update for multipath
&mut self.spaces,
handshake_status,
self.qlog.as_mut(),
now,
);

// Write RecoveryMetricsUpdate event to qlog.
if let Some(qlog) = &mut self.qlog {
path.recovery.qlog_recovery_metrics_updated(qlog);
}
}
}
}
Expand Down Expand Up @@ -3637,6 +3665,7 @@ impl Connection {
pkt_num: u64,
pkt_len: usize,
payload_len: usize,
qlog_frames: Vec<qlog::events::QuicFrame>,
) {
let qlog_pkt_hdr = events::PacketHeader::new_with_type(
hdr.pkt_type.to_qlog(),
Expand All @@ -3652,6 +3681,7 @@ impl Connection {
};
let ev_data = events::EventData::QuicPacketReceived {
header: qlog_pkt_hdr,
frames: Some(qlog_frames.into()),
is_coalesced: None,
retry_token: None,
stateless_reset_token: None,
Expand All @@ -3670,6 +3700,7 @@ impl Connection {
pkt_num: u64,
pkt_len: usize,
payload_len: usize,
qlog_frames: Vec<qlog::events::QuicFrame>,
) {
let qlog_pkt_hdr = events::PacketHeader::new_with_type(
hdr.pkt_type.to_qlog(),
Expand All @@ -3687,6 +3718,7 @@ impl Connection {

let ev_data = events::EventData::QuicPacketSent {
header: qlog_pkt_hdr,
frames: Some(qlog_frames.into()),
is_coalesced: None,
retry_token: None,
stateless_reset_token: None,
Expand Down Expand Up @@ -6611,36 +6643,47 @@ pub(crate) mod tests {
let mut sfile = slog.reopen().unwrap();

let mut test_pair = TestPair::new_with_test_config()?;
assert_eq!(test_pair.handshake(), Ok(()));
test_pair
.client
.set_qlog(Box::new(clog), "title".into(), "desc".into());
test_pair
.server
.set_qlog(Box::new(slog), "title".into(), "desc".into());
assert_eq!(test_pair.handshake(), Ok(()));

// Client create a stream and send data
let data = Bytes::from_static(b"test data over quic");
test_pair.client.stream_set_priority(0, 0, false)?;
test_pair.client.stream_write(0, data.clone(), true)?;
test_pair.client.stream_shutdown(0, Shutdown::Read, 0)?;
test_pair.client.stream_write(0, data.clone(), false)?;
let packets = TestPair::conn_packets_out(&mut test_pair.client)?;
TestPair::conn_packets_in(&mut test_pair.server, packets)?;

// Server read data from the stream
// Client lost some packets
test_pair.client.stream_write(0, data.clone(), false)?;
let _ = TestPair::conn_packets_out(&mut test_pair.client)?;
test_pair.client.stream_write(0, data.clone(), false)?;
let packets = TestPair::conn_packets_out(&mut test_pair.client)?;
TestPair::conn_packets_in(&mut test_pair.server, packets)?;

// Server read data from the stream
let mut buf = vec![0; data.len()];
test_pair.server.stream_read(0, &mut buf)?;

let packets = TestPair::conn_packets_out(&mut test_pair.server)?;
TestPair::conn_packets_in(&mut test_pair.client, packets)?;

// Advance ticks until loss timeout
assert!(test_pair.client.timeout().is_some());
let timeout = test_pair.client.timers.get(Timer::LossDetection);
test_pair.client.on_timeout(timeout.unwrap());

// Check client qlog
let mut clog_content = String::new();
cfile.read_to_string(&mut clog_content).unwrap();
assert_eq!(clog_content.contains("client"), true);
assert_eq!(clog_content.contains("quic:parameters_set"), true);
assert_eq!(clog_content.contains("quic:stream_data_moved"), true);
assert_eq!(clog_content.contains("quic:packet_sent"), true);
assert_eq!(clog_content.contains("recovery:metrics_updated"), true);
assert_eq!(clog_content.contains("recovery:packet_lost"), true);

// Check server qlog
let mut slog_content = String::new();
Expand All @@ -6649,6 +6692,7 @@ pub(crate) mod tests {
assert_eq!(slog_content.contains("quic:parameters_set"), true);
assert_eq!(slog_content.contains("quic:stream_data_moved"), true);
assert_eq!(slog_content.contains("quic:packet_received"), true);
assert_eq!(slog_content.contains("recovery:metrics_updated"), true);

Ok(())
}
Expand Down
Loading

0 comments on commit f27e906

Please sign in to comment.