Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve acknowledgement strategy #283

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 229 additions & 8 deletions src/connection/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ use log::*;
use strum::IntoEnumIterator;

use self::cid::ConnectionIdItem;
use self::space::BufferFlags;
use self::space::BufferType;
use self::space::PacketNumSpace;
use self::space::RateSamplePacketState;
use self::space::SpaceId;
use self::stream::Stream;
use self::stream::StreamIter;
use self::timer::Timer;
use self::ConnectionFlags::*;
use crate::codec;
use crate::codec::Decoder;
use crate::codec::Encoder;
use crate::connection::space::BufferFlags;
use crate::connection::space::BufferType;
use crate::connection::space::RateSamplePacketState;
use crate::error::ConnectionError;
use crate::error::Error;
use crate::frame;
Expand Down Expand Up @@ -74,8 +76,6 @@ use crate::RecoveryConfig;
use crate::Result;
use crate::Shutdown;

use self::space::SpaceId;

/// A QUIC connection.
pub struct Connection {
/// QUIC version used for the connection.
Expand Down Expand Up @@ -650,13 +650,18 @@ impl Connection {
}
space.recv_pkt_num_win.insert(pkt_num);
space.recv_pkt_num_need_ack.add_elem(pkt_num);
space.need_send_ack = space.need_send_ack || ack_eliciting_pkt;
space.largest_rx_pkt_num = cmp::max(space.largest_rx_pkt_num, pkt_num);
if !probing_pkt {
space.largest_rx_non_probing_pkt_num =
cmp::max(space.largest_rx_non_probing_pkt_num, pkt_num);
// TODO: try to do connection migration
}
if ack_eliciting_pkt {
space.largest_rx_ack_eliciting_pkt_num =
cmp::max(space.largest_rx_ack_eliciting_pkt_num, pkt_num);
}

self.try_schedule_ack_frame(space_id, pkt_num, ack_eliciting_pkt)?;

// An endpoint restarts its idle timer when a packet from its peer is
// received and processed successfully.
Expand Down Expand Up @@ -1254,6 +1259,68 @@ impl Connection {
}
}

/// Check and schedule an ACK frame to acknowledge incoming packets.
fn try_schedule_ack_frame(
&mut self,
space_id: SpaceId,
pkt_num: u64,
ack_eliciting: bool,
) -> Result<()> {
if !ack_eliciting {
return Ok(());
}

let space = self.spaces.get_mut(space_id).ok_or(Error::InternalError)?;
if space.need_send_ack {
return Ok(());
}

// An endpoint MUST acknowledge all ack-eliciting Initial and Handshake
// packets immediately
if space.id == SpaceId::Initial || space.id == SpaceId::Handshake {
space.need_send_ack = true;
return Ok(());
}

// A receiver SHOULD send an ACK frame after receiving at least two
// ack-eliciting packets.
space.ack_eliciting_pkts_since_last_sent_ack += 1;
let ack_eliciting_threshold = self.recovery_conf.ack_eliciting_threshold;
if space.ack_eliciting_pkts_since_last_sent_ack >= ack_eliciting_threshold {
space.need_send_ack = true;
space.ack_timer = None;
return Ok(());
}

// In order to assist loss detection at the sender, an endpoint SHOULD
// generate and send an ACK frame without delay when it receives an
// ack-eliciting packet either:
// - when the received packet has a packet number less than another
// ack-eliciting packet that has been received, or
// - when the packet has a packet number larger than the highest-numbered
// ack-eliciting packet that has been received and there are missing
// packets between that packet and this packet.
if pkt_num < space.largest_rx_ack_eliciting_pkt_num
|| pkt_num > space.largest_rx_ack_eliciting_pkt_num + 1
{
space.need_send_ack = true;
space.ack_timer = None;
return Ok(());
}

// All ack-eliciting 0-RTT and 1-RTT packets within its advertised
// max_ack_delay.
if space.ack_timer.is_none() {
let ack_delay = time::Duration::from_millis(self.peer_transport_params.max_ack_delay);
space.ack_timer = Some(time::Instant::now() + ack_delay);
debug!(
"{} set ack timer for space {:?}, timeout {:?} ",
&self.trace_id, space_id, space.ack_timer
);
}
Ok(())
}

/// A server could receive packets protected with 0-RTT keys prior to
/// receiving a TLS ClientHello. The server MAY retain these packets for
/// later decryption in anticipation of receiving a ClientHello.
Expand Down Expand Up @@ -1995,6 +2062,7 @@ impl Connection {
};
Connection::write_frame_to_packet(frame, out, st)?;
space.need_send_ack = false;
space.ack_eliciting_pkts_since_last_sent_ack = 0;

Ok(())
}
Expand Down Expand Up @@ -3039,6 +3107,10 @@ impl Connection {
Some(time) => self.timers.set(Timer::PathChallenge, time),
None => self.timers.stop(Timer::PathChallenge),
}
match self.spaces.min_ack_timer() {
Some(time) => self.timers.set(Timer::Ack, time),
None => self.timers.stop(Timer::Ack),
}

self.timers.next_timeout()
};
Expand Down Expand Up @@ -3091,6 +3163,19 @@ impl Connection {
}
}

Timer::Ack => {
for (_, space) in self.spaces.iter_mut() {
if let Some(timer) = space.ack_timer {
if timer > now {
continue;
}
debug!("{} ack timeout for space {:?}", self.trace_id, space.id);
space.need_send_ack = true;
space.ack_timer = None;
}
}
}

Timer::Idle => {
info!("{} idle timeout", self.trace_id);
self.flags.insert(Closed);
Expand Down Expand Up @@ -5364,6 +5449,7 @@ pub(crate) mod tests {
server_config.set_recv_udp_payload_size(1550);
server_config.set_initial_max_data(10000);
server_config.set_initial_max_stream_data_bidi_remote(10000);
server_config.set_ack_eliciting_threshold(1);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;
assert_eq!(
test_pair.client.paths.get(0)?.recovery.max_datagram_size,
Expand Down Expand Up @@ -5816,8 +5902,10 @@ pub(crate) mod tests {
for case in cases {
let mut client_config = TestPair::new_test_config(false)?;
client_config.enable_dplpmtud(case.0);
client_config.set_ack_eliciting_threshold(1);
let mut server_config = TestPair::new_test_config(true)?;
server_config.enable_dplpmtud(case.1);
server_config.set_ack_eliciting_threshold(1);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;
assert_eq!(test_pair.handshake(), Ok(()));

Expand Down Expand Up @@ -5850,10 +5938,12 @@ pub(crate) mod tests {
for case in cases {
let mut client_config = TestPair::new_test_config(false)?;
client_config.enable_dplpmtud(true);
client_config.set_ack_eliciting_threshold(1);
let mut server_config = TestPair::new_test_config(true)?;
server_config.enable_dplpmtud(false);
server_config.set_initial_max_data(10240);
server_config.set_initial_max_stream_data_bidi_remote(10240);
server_config.set_ack_eliciting_threshold(1);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;
let router_mtu: usize = case.0;

Expand Down Expand Up @@ -6278,6 +6368,122 @@ pub(crate) mod tests {
Ok(())
}

#[test]
fn ack_initial_or_handshake_space() -> Result<()> {
let mut client_config = TestPair::new_test_config(false)?;
client_config.set_ack_eliciting_threshold(2);
let mut server_config = TestPair::new_test_config(true)?;
server_config.set_ack_eliciting_threshold(2);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;

// Client send 1 UDP datagram carrying 1 Initial packet
let packets = TestPair::conn_packets_out(&mut test_pair.client)?;
assert_eq!(packets.len(), 1);

// Server send 2 UDP datagrams carrying 1 Initial packet and 2 Handshake packets
TestPair::conn_packets_in(&mut test_pair.server, packets)?;
let packets = TestPair::conn_packets_out(&mut test_pair.server)?;
assert_eq!(packets.len(), 2);

// Client's Initial must be acknowledged immediately
TestPair::conn_packets_in(&mut test_pair.client, packets)?;
{
let stat = test_pair.client.paths.get_active_mut()?.stats();
assert_eq!(stat.acked_count, 1);
}

// Client send Handshake and completes handshake.
let packets = TestPair::conn_packets_out(&mut test_pair.client)?;

// Server's Initial/Handshake must be acknowledged immediately
TestPair::conn_packets_in(&mut test_pair.server, packets)?;
{
let stat = test_pair.server.paths.get_active_mut()?.stats();
assert_eq!(stat.acked_count, 3);
}

Ok(())
}

#[test]
fn ack_data_space_ack_eliciting_threshold() -> Result<()> {
let mut client_config = TestPair::new_test_config(false)?;
client_config.set_ack_eliciting_threshold(4);
client_config.enable_dplpmtud(false);
let mut server_config = TestPair::new_test_config(true)?;
server_config.set_ack_eliciting_threshold(4);
server_config.enable_dplpmtud(false);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;
assert_eq!(test_pair.handshake(), Ok(()));
test_pair.move_forward()?;

let data = Bytes::from_static(b"QUIC");
let sid = test_pair.client.stream_bidi_new(0, false)?;
let acked_pkts = test_pair.client.paths.get_active_mut()?.stats().acked_count;

for i in 0..4 {
// Client write data on the stream
test_pair.client.stream_write(sid, data.clone(), false)?;
let packets = TestPair::conn_packets_out(&mut test_pair.client)?;

// Server recv packets from the client
TestPair::conn_packets_in(&mut test_pair.server, packets)?;
let packets = TestPair::conn_packets_out(&mut test_pair.server)?;

TestPair::conn_packets_in(&mut test_pair.client, packets)?;
let new_acked_pkts = test_pair.client.paths.get_active_mut()?.stats().acked_count;
if i < 3 {
assert_eq!(acked_pkts, new_acked_pkts);
} else {
assert_eq!(acked_pkts + 4, new_acked_pkts);
}
}

Ok(())
}

#[test]
fn ack_data_space_ack_timeout() -> Result<()> {
let mut client_config = TestPair::new_test_config(false)?;
client_config.set_ack_eliciting_threshold(4);
client_config.enable_dplpmtud(false);
let mut server_config = TestPair::new_test_config(true)?;
server_config.set_ack_eliciting_threshold(4);
server_config.enable_dplpmtud(false);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;
assert_eq!(test_pair.handshake(), Ok(()));
test_pair.move_forward()?;

let data = Bytes::from_static(b"QUIC");
let sid = test_pair.client.stream_bidi_new(0, false)?;
let acked_pkts = test_pair.client.paths.get_active_mut()?.stats().acked_count;

// Client write data on the stream
test_pair.client.stream_write(sid, data.clone(), false)?;
let packets = TestPair::conn_packets_out(&mut test_pair.client)?;

// Server recv packets from the client
TestPair::conn_packets_in(&mut test_pair.server, packets)?;
let packets = TestPair::conn_packets_out(&mut test_pair.server)?;
assert_eq!(packets.len(), 0);

// Advance server ticks until ack timeout
assert!(test_pair.server.timeout().is_some());
let ack_timeout = test_pair.server.timers.get(Timer::Ack);
assert!(ack_timeout.is_some());
let now = ack_timeout.unwrap();
test_pair.server.on_timeout(now);

// Server send ack
TestPair::conn_packets_in(&mut test_pair.server, packets)?;
let packets = TestPair::conn_packets_out(&mut test_pair.server)?;
TestPair::conn_packets_in(&mut test_pair.client, packets)?;
let new_acked_pkts = test_pair.client.paths.get_active_mut()?.stats().acked_count;
assert_eq!(acked_pkts + 1, new_acked_pkts);

Ok(())
}

#[test]
fn conn_close_by_application() -> Result<()> {
// Establish a connection
Expand Down Expand Up @@ -6705,7 +6911,12 @@ pub(crate) mod tests {

#[test]
fn conn_max_streams_bidi() -> Result<()> {
let mut test_pair = TestPair::new_with_test_config()?;
let mut client_config = TestPair::new_test_config(false)?;
client_config.set_ack_eliciting_threshold(1);
let mut server_config = TestPair::new_test_config(true)?;

let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;

assert_eq!(test_pair.handshake(), Ok(()));

// Client create bidi streams
Expand Down Expand Up @@ -6864,7 +7075,13 @@ pub(crate) mod tests {

#[test]
fn stream_reset() -> Result<()> {
let mut test_pair = TestPair::new_with_test_config()?;
let mut client_config = TestPair::new_test_config(false)?;
client_config.set_ack_eliciting_threshold(1);
let mut server_config = TestPair::new_test_config(true)?;
server_config.set_ack_eliciting_threshold(1);

let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;

assert_eq!(test_pair.handshake(), Ok(()));
let mut buf = vec![0; 16];

Expand Down Expand Up @@ -6982,12 +7199,14 @@ pub(crate) mod tests {
client_config.set_cid_len(crate::MAX_CID_LEN);
client_config.enable_multipath(true);
client_config.set_multipath_algorithm(MultipathAlgorithm::Redundant);
client_config.set_ack_eliciting_threshold(1);
let mut server_config = TestPair::new_test_config(true)?;
server_config.set_cid_len(crate::MAX_CID_LEN);

// Handshake with multipath enabled
server_config.enable_multipath(true);
server_config.set_multipath_algorithm(MultipathAlgorithm::Redundant);
server_config.set_ack_eliciting_threshold(1);
let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;

let blocks = vec![
Expand All @@ -7012,11 +7231,13 @@ pub(crate) mod tests {
client_config.set_cid_len(crate::MAX_CID_LEN);
client_config.enable_multipath(true);
client_config.set_multipath_algorithm(MultipathAlgorithm::RoundRobin);
client_config.set_ack_eliciting_threshold(1);

let mut server_config = TestPair::new_test_config(true)?;
server_config.set_cid_len(crate::MAX_CID_LEN);
server_config.enable_multipath(true);
server_config.set_multipath_algorithm(MultipathAlgorithm::RoundRobin);
server_config.set_ack_eliciting_threshold(1);

let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?;
let mut blocks = vec![];
Expand Down
Loading
Loading