diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 8071612f6..a5e455810 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -19,10 +19,20 @@ jobs: submodules: 'recursive' - name: Update rust run: rustup update - - name: Build TQUIC library - run: cargo build -F ffi --verbose - - name: Build TQUIC tools - run: cargo build --all --verbose + - name: Build TQUIC library and tools + run: cargo build --all -F ffi --verbose + + build_macos: + name: Build for MacOS + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: 'recursive' + - name: Update rust + run: rustup update + - name: Build TQUIC library and tools + run: cargo build --all -F ffi --verbose build_ios: name: Build for iOS @@ -41,7 +51,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: build - args: --target=${{ env.TARGET }} --verbose + args: --target=${{ env.TARGET }} --verbose --features ffi build_android: name: Build for Android diff --git a/.github/workflows/tquic-goodput.yml b/.github/workflows/tquic-goodput.yml index 3d980eae8..17ba9d3ad 100644 --- a/.github/workflows/tquic-goodput.yml +++ b/.github/workflows/tquic-goodput.yml @@ -7,6 +7,7 @@ on: env: CARGO_TERM_COLOR: always + QUIC_IMAGES: lsquic=tquicgroup/qirls,picoquic=tquicgroup/qirpq,quiche=tquicgroup/qircq jobs: measure: @@ -14,7 +15,7 @@ jobs: strategy: matrix: - impl: [tquic,lsquic,quiche] + impl: [tquic,lsquic,picoquic,quiche] case: [goodput100k,goodput1m,goodput10m] cc: [cubic, bbr] @@ -34,10 +35,10 @@ jobs: - name: Run the interop tests run: | cd quic-interop-runner - python3 run.py -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=0 --rate_to_client=0" -j ${{ matrix.case }}-0-${{ matrix.cc }}-${{ matrix.impl }}.json - python3 run.py -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=1 --rate_to_client=1" -j ${{ matrix.case }}-1-${{ matrix.cc }}-${{ matrix.impl }}.json - python3 run.py -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=3 --rate_to_client=3" -j ${{ matrix.case }}-3-${{ matrix.cc }}-${{ matrix.impl }}.json - python3 run.py -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=5 --rate_to_client=5" -j ${{ matrix.case }}-5-${{ matrix.cc }}-${{ matrix.impl }}.json + python3 run.py -r $QUIC_IMAGES -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=0 --rate_to_client=0" -j ${{ matrix.case }}-0-${{ matrix.cc }}-${{ matrix.impl }}.json + python3 run.py -r $QUIC_IMAGES -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=1 --rate_to_client=1" -j ${{ matrix.case }}-1-${{ matrix.cc }}-${{ matrix.impl }}.json + python3 run.py -r $QUIC_IMAGES -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=3 --rate_to_client=3" -j ${{ matrix.case }}-3-${{ matrix.cc }}-${{ matrix.impl }}.json + python3 run.py -r $QUIC_IMAGES -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=5 --rate_to_client=5" -j ${{ matrix.case }}-5-${{ matrix.cc }}-${{ matrix.impl }}.json - name: Store measurement results uses: actions/upload-artifact@v3 diff --git a/CHANGELOG.md b/CHANGELOG.md index 492b6d4a7..cacade0d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,30 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [v0.5.0] - 2024-01-03 + +### Added +- Add support for building on MacOS +- Add support for stateless reset +- Relaese tls_conf_selector as soon as the hanshake is completed. +- Add linear mode and upper limit for probe timeout +- Add FFI enable_multipath()/set_multipath_algorithm() +- Add RoundRobin multipath scheduler +- Add more units test for mulitpath transport +- tquic_client: stop trying and exit if it fails to reconnect the server multiple times. +- tquic_client: output the stats first and then exit when it receives an SIGINT signal. + +### Changed +- Simplify FFI quic_set_logger() to avoid from return unnessary errors +- Rename set_multipath() in Config to enable_multipath() +- Rename set_multipath_algor() in Config to set_multipath_algorithm() +- Change default congestion control algorithm to BBR + +### Fixed +- Fix stream scheduling for multipe incredmental streams +- Fix reinjection for multipath transport + + ## [v0.4.0] - 2023-12-18 ### Added @@ -77,6 +101,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Provide example clients and servers. +[v0.5.0]: https://github.com/tencent/tquic/compare/v0.4.0...v0.5.0 [v0.4.0]: https://github.com/tencent/tquic/compare/v0.3.0...v0.4.0 [v0.3.0]: https://github.com/tencent/tquic/compare/v0.2.0...v0.3.0 [v0.2.0]: https://github.com/tencent/tquic/compare/v0.1.0...v0.2.0 diff --git a/Cargo.toml b/Cargo.toml index 07d94bebd..790052a21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tquic" -version = "0.4.0" +version = "0.5.0" edition = "2021" rust-version = "1.70.0" license = "Apache-2.0" diff --git a/cbindgen.toml b/cbindgen.toml index 7a423c513..65133e044 100644 --- a/cbindgen.toml +++ b/cbindgen.toml @@ -40,6 +40,7 @@ exclude = ["MAX_CID_LEN", "MIN_CLIENT_INITIAL_LEN"] "TlsConfigSelectMethods" = "quic_tls_config_select_methods_t" "TlsConfigSelectorContext" = "quic_tls_config_select_context_t" "CongestionControlAlgorithm" = "quic_congestion_control_algorithm" +"LevelFilter" = "quic_log_level" "Http3Connection" = "http3_conn_t" "Http3Config" = "http3_config_t" "Http3Context" = "http3_context_t" diff --git a/include/tquic.h b/include/tquic.h index fb06b7b31..d338dbed6 100644 --- a/include/tquic.h +++ b/include/tquic.h @@ -53,6 +53,36 @@ typedef enum quic_congestion_control_algorithm { QUIC_CONGESTION_CONTROL_ALGORITHM_COPA, } quic_congestion_control_algorithm; +/** + * Available multipath scheduling algorithms. + */ +typedef enum quic_multipath_algorithm { + /** + * The scheduler sends packets over the path with the lowest smoothed RTT + * among all available paths. It aims to optimize throughput and achieve + * load balancing, making it particularly advantageous for bulk transfer + * applications in heterogeneous networks. + */ + MULTIPATH_ALGORITHM_MIN_RTT, + /** + * The scheduler sends all packets redundantly on all available paths. It + * utilizes additional bandwidth to minimize latency, thereby reducing the + * overall flow completion time for applications with bounded bandwidth + * requirements that can be met by a single path. + * In scenarios where two paths with varying available bandwidths are + * present, it ensures a goodput at least equivalent to the best single + * path. + */ + MULTIPATH_ALGORITHM_REDUNDANT, + /** + * The scheduler sends packets over available paths in a round robin + * manner. It aims to fully utilize the capacity of each path as the + * distribution across all path is equal. It is only used for testing + * purposes. + */ + MULTIPATH_ALGORITHM_ROUND_ROBIN, +} quic_multipath_algorithm; + /** * The stream's side to shutdown. */ @@ -360,11 +390,41 @@ void quic_config_set_min_congestion_window(struct quic_config_t *config, uint64_ */ void quic_config_set_initial_rtt(struct quic_config_t *config, uint64_t v); +/** + * Set the linear factor for calculating the probe timeout. + * The endpoint do not backoff the first `v` consecutive probe timeouts. + * The default value is `0`. + * The configuration should be changed with caution. Setting a value greater than the default + * will cause retransmission to be more aggressive. + */ +void quic_config_set_pto_linear_factor(struct quic_config_t *config, uint64_t v); + +/** + * Set the upper limit of probe timeout in milliseconds. + * A Probe Timeout (PTO) triggers the sending of one or two probe datagrams and enables a + * connection to recover from loss of tail packets or acknowledgments. + * See RFC 9002 Section 6.2. + */ +void quic_config_set_max_pto(struct quic_config_t *config, uint64_t v); + /** * Set the `active_connection_id_limit` transport parameter. */ void quic_config_set_active_connection_id_limit(struct quic_config_t *config, uint64_t v); +/** + * Set the `enable_multipath` transport parameter. + * The default value is false. (Experimental) + */ +void quic_config_enable_multipath(struct quic_config_t *config, bool enabled); + +/** + * Set the multipath scheduling algorithm + * The default value is MultipathAlgorithm::MinRtt + */ +void quic_config_set_multipath_algorithm(struct quic_config_t *config, + enum quic_multipath_algorithm v); + /** * Set the maximum size of the connection flow control window. */ @@ -630,6 +690,11 @@ bool quic_conn_is_closed(struct quic_conn_t *conn); */ bool quic_conn_is_idle_timeout(struct quic_conn_t *conn); +/** + * Check whether the connection was closed due to stateless reset. + */ +bool quic_conn_is_reset(struct quic_conn_t *conn); + /** * Returns the error from the peer, if any. */ @@ -911,16 +976,44 @@ int http3_take_priority_update(struct http3_conn_t *conn, void *argp), void *argp); +/** + * An enum representing the available verbosity level filters of the logger. + */ +typedef enum quic_log_level { + /** + * A level lower than all log levels. + */ + QUIC_LOG_LEVEL_OFF, + /** + * Corresponds to the `Error` log level. + */ + QUIC_LOG_LEVEL_ERROR, + /** + * Corresponds to the `Warn` log level. + */ + QUIC_LOG_LEVEL_WARN, + /** + * Corresponds to the `Info` log level. + */ + QUIC_LOG_LEVEL_INFO, + /** + * Corresponds to the `Debug` log level. + */ + QUIC_LOG_LEVEL_DEBUG, + /** + * Corresponds to the `Trace` log level. + */ + QUIC_LOG_LEVEL_TRACE, +} quic_log_level; + /** * Set logger. * `cb` is a callback function that will be called for each log message. * `line` is a null-terminated log message and `argp` is user-defined data that will be passed to * the callback. - * `level` is the log level filter, valid values are "OFF", "ERROR", "WARN", "INFO", "DEBUG", "TRACE". + * `level` represents the log level. */ -int quic_set_logger(void (*cb)(const uint8_t *line, void *argp), - void *argp, - const char *level); +void quic_set_logger(void (*cb)(const uint8_t *line, void *argp), void *argp, quic_log_level level); typedef enum http3_error { HTTP3_NO_ERROR = 0, diff --git a/src/congestion_control/congestion_control.rs b/src/congestion_control/congestion_control.rs index 46e580556..84c9d5fbd 100644 --- a/src/congestion_control/congestion_control.rs +++ b/src/congestion_control/congestion_control.rs @@ -34,14 +34,13 @@ pub use cubic::Cubic; pub use cubic::CubicConfig; pub use hystart_plus_plus::HystartPlusPlus; -/// Available congestion control algorithm +/// Available congestion control algorithms. #[repr(C)] #[derive(Eq, PartialEq, Debug, Clone, Copy, Default)] pub enum CongestionControlAlgorithm { /// CUBIC uses a cubic function instead of a linear window increase function /// of the current TCP standards to improve scalability and stability under /// fast and long-distance networks.. - #[default] Cubic, /// BBR uses recent measurements of a transport connection's delivery rate, @@ -49,6 +48,7 @@ pub enum CongestionControlAlgorithm { /// network path. The model is then used to control data transmission speed /// and the maximum volume of data allowed in flight in the network at any /// time. + #[default] Bbr, /// BBRv3 is the latest version of BBR, including various fixes and @@ -248,7 +248,7 @@ mod tests { let mut config = Config::new()?; let cc = build_congestion_controller(&config.recovery); - assert_eq!(cc.name(), "CUBIC"); + assert_eq!(cc.name(), "BBR"); assert_eq!(cc.in_slow_start(), true); assert_eq!(cc.in_recovery(Instant::now()), false); assert_eq!( diff --git a/src/connection/cid.rs b/src/connection/cid.rs index adca8e0d8..e9aea29a4 100644 --- a/src/connection/cid.rs +++ b/src/connection/cid.rs @@ -28,13 +28,13 @@ pub struct ConnectionIdItem { /// The Connection ID. pub cid: ConnectionId, - /// Its associated sequence number. + /// The associated sequence number. pub seq: u64, - /// Its associated reset token. Initial CIDs may not have any reset token. + /// The associated reset token. pub reset_token: Option<u128>, - /// The path identifier using this CID, if any. + /// The path using the Connection ID. pub path_id: Option<usize>, } diff --git a/src/connection/connection.rs b/src/connection/connection.rs index 1598f012e..483f45210 100644 --- a/src/connection/connection.rs +++ b/src/connection/connection.rs @@ -202,7 +202,6 @@ impl Connection { let trace_id = format!("{}-{}", if is_server { "SERVER" } else { "CLIENT" }, scid); let mut path = path::Path::new(local, remote, true, &conf.recovery, &trace_id); - if is_server { // The server connection is created upon receiving a Initial packet // with a valid token sent by the client. @@ -216,8 +215,11 @@ impl Connection { let paths = path::PathMap::new(path, cid_limit, is_server); let active_pid = paths.get_active_path_id()?; - let reset_token = if is_server { - conf.local_transport_params.stateless_reset_token + let reset_token = if is_server && conf.stateless_reset { + // Note that clients cannot use the stateless_reset_token transport + // parameter because their transport parameters do not have + // confidentiality protection + Some(ResetToken::generate(&conf.reset_token_key, scid).to_u128()) } else { None }; @@ -284,6 +286,7 @@ impl Connection { conn.local_transport_params.retry_source_connection_id = addr_token.rscid; conn.flags.insert(DidRetry); } + conn.local_transport_params.stateless_reset_token = reset_token; conn.set_transport_params()?; // Derive initial secrets for the client. @@ -1150,6 +1153,13 @@ impl Connection { } } + // The remote server can issue a stateless_reset_token transport parameter + // that applies to the connection ID that it selected during the handshake. + if let Some(reset_token) = peer_params.stateless_reset_token { + let reset_token = ResetToken::from_u128(reset_token); + self.events.add(Event::ResetTokenAdvertised(reset_token)); + } + self.set_peer_trans_params(peer_params)?; self.flags.insert(AppliedPeerTransportParams); @@ -2167,7 +2177,7 @@ impl Connection { let out = &mut out[st.written..]; if (pkt_type != PacketType::OneRTT && pkt_type != PacketType::ZeroRTT) || self.is_closing() - || out.len() <= frame::MIN_STREAM_OVERHEAD + || out.len() <= frame::MAX_STREAM_OVERHEAD || !self.paths.get(path_id)?.active() { return Ok(()); @@ -2198,17 +2208,12 @@ impl Connection { // 3. encode the frame header with the updated frame header segments. let frame_hdr_len = frame::stream_header_wire_len(stream_id, stream_off); - // If the buffer is too short, we won't attempt to write any more stream frames into it. - if cap.checked_sub(frame_hdr_len).is_none() { - break; - } - // Read stream data and write into the packet buffer directly. let (frame_data_len, fin) = stream.send.read(&mut out[len + frame_hdr_len..])?; // Retain stream data for reinjection if needed. let data = if self.flags.contains(EnableMultipath) - && reinjection_required(self.multipath_conf.multipath_algor) + && reinjection_required(self.multipath_conf.multipath_algorithm) { let start = len + frame_hdr_len; Bytes::copy_from_slice(&out[start..start + frame_data_len]) @@ -2244,6 +2249,11 @@ impl Connection { if !stream.is_sendable() { self.streams.remove_sendable(); } + + // If the buffer is too short, we won't attempt to write any more stream frames into it. + if cap <= frame::MAX_STREAM_OVERHEAD { + break; + } } Ok(()) @@ -2308,7 +2318,8 @@ impl Connection { .ok_or(Error::InternalError)?; let frames = &mut space.reinject.frames; debug!( - "try_write_reinjected_frames(): path id {} reinject queue size {}", + "{} try to write reinjected frames: path_id={} reinjected_frames={}", + self.trace_id, path_id, frames.len() ); @@ -2563,6 +2574,9 @@ impl Connection { } /// Select a available path for sending packet + /// + /// The selected path should have a packet that can be sent out, unless none + /// of the paths are feasible. fn select_send_path(&mut self) -> Result<usize> { // Select a unvalidated path with path probing packets to send if self.is_established() { @@ -2583,18 +2597,16 @@ impl Connection { // Select a validated path with sufficient congestion window by the // multipath scheduler. if self.need_send_path_unaware_frames() { - let scheduler = match self.multipath_scheduler { + let s = match self.multipath_scheduler { Some(ref mut scheduler) => scheduler, None => return Err(Error::InternalError), }; - if let Ok(pid) = - scheduler.on_select(&mut self.paths, &mut self.spaces, &mut self.streams) - { + if let Ok(pid) = s.on_select(&mut self.paths, &mut self.spaces, &mut self.streams) { return Ok(pid); } } - // Select a validated path with ACK/PTO packets to send. + // Select a validated path with ACK/PTO/Reinjected packets to send. for (pid, path) in self.paths.iter_mut() { if !path.active() { continue; @@ -2607,6 +2619,9 @@ impl Connection { if space.loss_probes > 0 { return Ok(pid); } + if space.need_send_reinjected_frames() && path.recovery.can_send() { + return Ok(pid); + } continue; } None => continue, @@ -3000,6 +3015,11 @@ impl Connection { self.flags.contains(HandshakeTimeout) } + /// Check whether the connection was closed due to stateless reset. + pub fn is_reset(&self) -> bool { + self.flags.contains(GotReset) + } + /// Close the connection. pub fn close(&mut self, app: bool, err: u64, reason: &[u8]) -> Result<()> { if self.is_closed() || self.is_draining() { @@ -3020,6 +3040,22 @@ impl Connection { Ok(()) } + /// Mark the connection as stateless reset by the peer. + pub(crate) fn reset(&mut self) { + if self.is_closed() || self.is_draining() { + return; + } + + // The connection is reset by the peer and it MUST enter the draining + // period and not send any further packets on this connection. + self.flags.insert(GotReset); + if let Ok(p) = self.paths.get_active_mut() { + let pto = p.recovery.rtt.pto_base(); + let now = time::Instant::now(); + self.timers.set(Timer::Draining, now + pto * 3); + } + } + /// Returns the error from the peer, if any. pub fn peer_error(&self) -> Option<&ConnectionError> { self.peer_error.as_ref() @@ -3831,30 +3867,33 @@ enum ConnectionFlags { /// The connection was closed due to handshake timeout. HandshakeTimeout = 1 << 11, + /// The connection was closed due to stateless reset. + GotReset = 1 << 12, + /// An ack-eliciting packet should be sent. - NeedSendAckEliciting = 1 << 12, + NeedSendAckEliciting = 1 << 13, /// A NewToken frame should be sent. - NeedSendNewToken = 1 << 13, + NeedSendNewToken = 1 << 14, /// A HandshakeDone frame should be sent. - NeedSendHandshakeDone = 1 << 14, + NeedSendHandshakeDone = 1 << 15, /// The client has acknowledged the server's HandshakeDone. - HandshakeDoneAcked = 1 << 15, + HandshakeDoneAcked = 1 << 16, /// The connection has sent an ack-eliciting packet since receiving a packet. /// It is used for resetting Idle timer. - SentAckElicitingSinceRecvPkt = 1 << 16, + SentAckElicitingSinceRecvPkt = 1 << 17, /// The connection is in the tickable queue of the endpoint. - Tickable = 1 << 17, + Tickable = 1 << 18, /// The connection is in the sendable queue of the endpoint. - Sendable = 1 << 18, + Sendable = 1 << 19, /// The multipath extension is successfully negotiated. - EnableMultipath = 1 << 19, + EnableMultipath = 1 << 20, } /// Statistics about a QUIC connection. @@ -4220,7 +4259,7 @@ pub(crate) mod tests { conf.set_address_token_lifetime(3600); conf.set_send_batch_size(2); conf.set_max_handshake_timeout(0); - conf.set_multipath(false); + conf.enable_multipath(false); let application_protos = vec![b"h3".to_vec()]; let tls_config = if !is_server { @@ -4820,8 +4859,6 @@ pub(crate) mod tests { server_config.set_tls_config_selector(conf_selector.clone()); for i in 0..conf_selector.len() { - assert_eq!(Arc::strong_count(&conf_selector), 2); - let mut test_pair = TestPair::new_with_server_name( &mut client_config, &mut server_config, @@ -4831,7 +4868,6 @@ pub(crate) mod tests { assert!(test_pair.handshake().is_ok()); assert!(test_pair.client.is_established()); assert!(test_pair.server.is_established()); - assert_eq!(Arc::strong_count(&conf_selector), 3); } Ok(()) @@ -4873,10 +4909,10 @@ pub(crate) mod tests { ]; for case in cases { let mut client_config = TestPair::new_test_config(false)?; - client_config.set_multipath(case.0); + client_config.enable_multipath(case.0); client_config.set_cid_len(case.1); let mut server_config = TestPair::new_test_config(true)?; - server_config.set_multipath(case.2); + server_config.enable_multipath(case.2); server_config.set_cid_len(case.3); let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?; @@ -5998,6 +6034,54 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn conn_multi_incremental_streams_send_round_robin() -> Result<()> { + let server_transport_params = TransportParams { + initial_max_data: 2000, + initial_max_stream_data_bidi_remote: 2000, + initial_max_streams_bidi: 4, + ..TransportParams::default() + }; + + let mut client_config = TestPair::new_test_config(false)?; + let mut server_config = TestPair::new_test_config(true)?; + server_config.local_transport_params = server_transport_params.clone(); + + let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?; + assert_eq!(test_pair.handshake(), Ok(())); + + // 1. Client create four bidi streams [0, 4, 8, 12], and write data on them + let data = TestPair::new_test_data(500); + for i in 0..4 { + assert_eq!( + test_pair.client.stream_write(i * 4, data.clone(), true)?, + data.len() + ); + } + + // 2. Try to send stream data in round-robin order + let mut packets = Vec::new(); + for i in 0..4 { + let mut out = vec![0u8; 100]; + let info = match test_pair.client.send(&mut out) { + Ok((written, info)) => { + out.truncate(written); + info + } + Err(e) => return Err(e), + }; + packets.push((out, info)); + } + + // 3. Server recv stream data, all streams must be readable + TestPair::conn_packets_in(&mut test_pair.server, packets)?; + for i in 0..4 { + assert!(test_pair.server.stream_readable(i * 4)); + } + + Ok(()) + } + #[test] fn conn_max_streams_bidi() -> Result<()> { let mut test_pair = TestPair::new_with_test_config()?; @@ -6200,50 +6284,131 @@ pub(crate) mod tests { Ok(()) } - #[test] - fn conn_multipath_transfer() -> Result<()> { - let mut client_config = TestPair::new_test_config(false)?; - client_config.set_cid_len(crate::MAX_CID_LEN); - client_config.set_multipath(true); - client_config.set_multipath_algor(MultipathAlgorithm::Redundant); - let mut server_config = TestPair::new_test_config(true)?; - server_config.set_cid_len(crate::MAX_CID_LEN); - server_config.set_multipath(true); - server_config.set_multipath_algor(MultipathAlgorithm::MinRtt); - + // Establish a multipath connection between the client and server and then + // send data blocks from the client to the server. + // + // The size of data block in `blocks` should be less than 256. + fn conn_multipath_transfer(test_pair: &mut TestPair, blocks: Vec<Bytes>) -> Result<()> { // Handshake with multipath enabled - let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?; test_pair.handshake()?; - assert_eq!(test_pair.client.paths_iter().count(), 1); - assert_eq!(test_pair.server.paths_iter().count(), 1); + assert!(test_pair.client.is_multipath()); + assert!(test_pair.server.is_multipath()); // Client and server advertise new cids test_pair.advertise_new_cids()?; - // Client try to add path again + // Client try to add a new path + assert_eq!(test_pair.client.paths_iter().count(), 1); + assert_eq!(test_pair.server.paths_iter().count(), 1); let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9444); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 443); test_pair.add_and_validate_path(client_addr, server_addr)?; assert_eq!(test_pair.client.paths_iter().count(), 2); assert_eq!(test_pair.server.paths_iter().count(), 2); - for (data, fin) in vec![ - (Bytes::from_static(b"Everything"), false), - (Bytes::from_static(b"Over"), false), - (Bytes::from_static(b"Multipath QUIC"), true), - ] { + // Client send bytes over multipath + let mut buf = vec![0; 2048]; + for data in blocks.iter() { // Client write and send data on stream 4 let len = data.len(); - assert_eq!(test_pair.client.stream_write(4, data.clone(), fin), Ok(len)); + assert_eq!( + test_pair.client.stream_write(4, data.clone(), false), + Ok(len) + ); let packets = TestPair::conn_packets_out(&mut test_pair.client)?; // Server recv and read data on stream 4 TestPair::conn_packets_in(&mut test_pair.server, packets)?; - let mut buf = vec![0; 18]; - assert_eq!(test_pair.server.stream_read(4, &mut buf)?, (len, fin)); + assert_eq!(test_pair.server.stream_read(4, &mut buf)?, (len, false)); assert_eq!(&buf[..len], &data[..]); + + // Server reply ack + let packets = TestPair::conn_packets_out(&mut test_pair.server)?; + TestPair::conn_packets_in(&mut test_pair.client, packets)?; } + let packets = TestPair::conn_packets_out(&mut test_pair.server)?; + TestPair::conn_packets_in(&mut test_pair.client, packets)?; + Ok(()) + } + + #[test] + fn conn_multipath_transfer_minrtt() -> Result<()> { + let mut client_config = TestPair::new_test_config(false)?; + client_config.set_cid_len(crate::MAX_CID_LEN); + client_config.enable_multipath(true); + client_config.set_multipath_algorithm(MultipathAlgorithm::MinRtt); + + let mut server_config = TestPair::new_test_config(true)?; + server_config.set_cid_len(crate::MAX_CID_LEN); + server_config.enable_multipath(true); + server_config.set_multipath_algorithm(MultipathAlgorithm::MinRtt); + + let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?; + let mut blocks = vec![]; + for i in 0..1000 { + blocks.push(Bytes::from_static(b"Everything over multipath")); + } + conn_multipath_transfer(&mut test_pair, blocks)?; + // Note: The scheduling result is uncertain, so we only verify if the + // transmission was successful. + Ok(()) + } + + #[test] + fn conn_multipath_transfer_redundant() -> Result<()> { + let mut client_config = TestPair::new_test_config(false)?; + client_config.set_cid_len(crate::MAX_CID_LEN); + client_config.enable_multipath(true); + client_config.set_multipath_algorithm(MultipathAlgorithm::Redundant); + let mut server_config = TestPair::new_test_config(true)?; + server_config.set_cid_len(crate::MAX_CID_LEN); + + // Handshake with multipath enabled + server_config.enable_multipath(true); + server_config.set_multipath_algorithm(MultipathAlgorithm::Redundant); + let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?; + + let blocks = vec![ + Bytes::from_static(b"Everything"), + Bytes::from_static(b"Over"), + Bytes::from_static(b"Multipath QUIC"), + ]; + + conn_multipath_transfer(&mut test_pair, blocks)?; + + for (i, path) in test_pair.server.paths.iter() { + let s = path.stats(); + assert!(s.sent_count > 3); + assert!(s.recv_count > 3); + } + Ok(()) + } + + #[test] + fn conn_multipath_transfer_roundrobin() -> Result<()> { + let mut client_config = TestPair::new_test_config(false)?; + client_config.set_cid_len(crate::MAX_CID_LEN); + client_config.enable_multipath(true); + client_config.set_multipath_algorithm(MultipathAlgorithm::RoundRobin); + + let mut server_config = TestPair::new_test_config(true)?; + server_config.set_cid_len(crate::MAX_CID_LEN); + server_config.enable_multipath(true); + server_config.set_multipath_algorithm(MultipathAlgorithm::RoundRobin); + + let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?; + let mut blocks = vec![]; + for i in 0..100 { + blocks.push(Bytes::from_static(b"Everything over multipath")); + } + conn_multipath_transfer(&mut test_pair, blocks)?; + + for (i, path) in test_pair.server.paths.iter() { + let s = path.stats(); + assert!(s.sent_count > 50); + assert!(s.recv_count > 50); + } Ok(()) } diff --git a/src/connection/path.rs b/src/connection/path.rs index 2958aa279..4fb6b6d94 100644 --- a/src/connection/path.rs +++ b/src/connection/path.rs @@ -536,10 +536,12 @@ mod tests { RecoveryConfig { max_datagram_size: 1200, max_ack_delay: time::Duration::from_millis(0), - congestion_control_algorithm: CongestionControlAlgorithm::Cubic, + congestion_control_algorithm: CongestionControlAlgorithm::Bbr, min_congestion_window: 2_u64, - initial_congestion_window: 8_u64, + initial_congestion_window: 10_u64, initial_rtt: crate::INITIAL_RTT, + pto_linear_factor: crate::DEFAULT_PTO_LINEAR_FACTOR, + max_pto: crate::MAX_PTO, } } diff --git a/src/connection/recovery.rs b/src/connection/recovery.rs index ce86497fd..aa40d8fa4 100644 --- a/src/connection/recovery.rs +++ b/src/connection/recovery.rs @@ -55,6 +55,12 @@ pub struct Recovery { /// The maximum size of outgoing UDP payloads. pub max_datagram_size: usize, + /// The endpoint do not backoff the first `pto_linear_factor` consecutive probe timeouts. + pto_linear_factor: u64, + + /// Upper limit of probe timeout. + max_pto: Duration, + /// The number of times a PTO has been sent without receiving an /// acknowledgment. It is used for PTO calculation. pto_count: usize, @@ -91,6 +97,8 @@ impl Recovery { Recovery { max_ack_delay: conf.max_ack_delay, max_datagram_size: conf.max_datagram_size, + pto_linear_factor: conf.pto_linear_factor, + max_pto: conf.max_pto, pto_count: 0, loss_detection_timer: None, pkt_thresh: INITIAL_PACKET_THRESHOLD, @@ -600,6 +608,30 @@ impl Recovery { (time, sid) } + /// Calculate the probe timeout. + fn calculate_pto(&self) -> Duration { + let backoff_factor = self + .pto_count + .saturating_sub(self.pto_linear_factor as usize); + + cmp::min( + self.rtt.pto_base() * 2_u32.saturating_pow(backoff_factor as u32), + self.max_pto, + ) + } + + /// Calculate the probe timeout include `max_ack_delay`. + fn pto_with_ack_delay(&self, duration: Duration) -> Duration { + let backoff_factor = self + .pto_count + .saturating_sub(self.pto_linear_factor as usize); + + cmp::min( + duration + self.max_ack_delay * 2_u32.saturating_pow(backoff_factor as u32), + self.max_pto, + ) + } + /// Return the min pto time and the corresponding space fn get_pto_time_and_space( &self, @@ -608,9 +640,7 @@ impl Recovery { handshake_status: HandshakeStatus, now: Instant, ) -> (Option<Instant>, SpaceId) { - // When a PTO timer expires, the PTO backoff MUST be increased, - // resulting in the PTO period being set to twice its current value. - let mut duration = self.rtt.pto_base() * 2_u32.pow(self.pto_count as u32); + let mut duration = self.calculate_pto(); // Arm PTO from now when there are no inflight packets. if self.bytes_in_flight == 0 { @@ -646,7 +676,7 @@ impl Recovery { return (pto_timeout, pto_space); } // Include max_ack_delay and backoff for Application Data. - duration += self.max_ack_delay * 2_u32.pow(self.pto_count as u32); + duration = self.pto_with_ack_delay(duration); } let new_time = space @@ -749,10 +779,12 @@ mod tests { RecoveryConfig { max_datagram_size: 1200, max_ack_delay: Duration::from_millis(100), - congestion_control_algorithm: CongestionControlAlgorithm::Cubic, + congestion_control_algorithm: CongestionControlAlgorithm::Bbr, min_congestion_window: 2_u64, - initial_congestion_window: 8_u64, + initial_congestion_window: 10_u64, initial_rtt: crate::INITIAL_RTT, + pto_linear_factor: crate::DEFAULT_PTO_LINEAR_FACTOR, + max_pto: crate::MAX_PTO, } } @@ -1153,4 +1185,49 @@ mod tests { Ok(()) } + + const MAX_PTO_UT: Duration = Duration::from_secs(30); + + fn calculate_pto_with_count(count: usize) -> (Duration, Duration) { + let mut conf = new_test_recovery_config(); + conf.pto_linear_factor = 2; + conf.max_pto = MAX_PTO_UT; + let mut recovery = Recovery::new(&conf); + recovery.pto_count = count; + + let duration = recovery.calculate_pto(); + (duration, recovery.pto_with_ack_delay(duration)) + } + + #[test] + fn calculate_pto() -> Result<()> { + assert_eq!( + calculate_pto_with_count(0), + ( + Duration::from_millis(999), // 999 * 2 ^ ( 2 - 2) + Duration::from_millis(1099) // (999 + 100) * 2 ^ 0, max_ack_delay is 100ms. + ) + ); + + assert_eq!( + calculate_pto_with_count(2), + ( + Duration::from_millis(999), // 999 * 2 ^ 0 + Duration::from_millis(1099) // (999 + 100) * 2 ^ 0 + ) + ); + + assert_eq!( + calculate_pto_with_count(3), + ( + Duration::from_millis(1998), // 999 * 2 ^ ( 2 - 2) + Duration::from_millis(2198) // (999 + 100) * 2 ^ ( 3 - 2) + ) + ); + + // PTO reach the upper limit. + assert_eq!(calculate_pto_with_count(100), (MAX_PTO_UT, MAX_PTO_UT)); + + Ok(()) + } } diff --git a/src/connection/space.rs b/src/connection/space.rs index c5200f58f..c998c4996 100644 --- a/src/connection/space.rs +++ b/src/connection/space.rs @@ -181,6 +181,11 @@ impl PacketNumSpace { // See RFC 9002 Section 6.2.4 self.loss_probes > 0 } + + /// Return whether the space should send a reinjection packet. + pub fn need_send_reinjected_frames(&self) -> bool { + !self.reinject.frames.is_empty() + } } /// All packet number spaces on a QUIC connection @@ -252,7 +257,7 @@ impl PacketNumSpaceMap { /// Return whether the connection should send a reinjection packet. pub fn need_send_reinjected_frames(&self) -> bool { for space in self.spaces.values() { - if !space.reinject.frames.is_empty() { + if space.need_send_reinjected_frames() { return true; } } diff --git a/src/endpoint.rs b/src/endpoint.rs index 87e56b97e..24b175a6a 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -195,11 +195,22 @@ impl Endpoint { let (mut hdr, _) = PacketHeader::from_bytes(buf, cid_len)?; let (local, remote) = (info.dst, info.src); - // Delivery the datagram to the target connection. - // TODO: support stateless reset - if let Some(c) = self.routes.find(&hdr.dcid, buf, info) { + // Try to delivery the datagram to the target connection. + if let (Some(c), reset) = self.routes.find(&hdr.dcid, buf, info) { if let Some(conn) = self.conns.get_mut(*c) { conn.mark_tickable(true); + // Detected a Stateless Reset packet for an existing connection + if reset && self.config.stateless_reset { + trace!( + "{} connection {:?} got stateless reset from {:?}", + &self.trace_id, + conn.trace_id(), + info + ); + conn.reset(); + return Ok(()); + } + conn.recv(buf, info).map(|_| ())?; return Ok(()); } @@ -279,11 +290,6 @@ impl Endpoint { // Send the Stateless Reset packet for the unknown connection if hdr.pkt_type == PacketType::OneRTT && !hdr.dcid.is_empty() && self.config.stateless_reset { - trace!( - "endpoint send stateless retry: remote {:?} local {:?}", - remote, - local - ); self.send_stateless_reset(buf.len(), &hdr.dcid, local, remote)?; return Ok(()); } @@ -381,7 +387,7 @@ impl Endpoint { } /// Write an Stateless Reset packet which will be sent later. - /// TODO: limit based on address + /// TODO: rate limit based on address fn send_stateless_reset( &mut self, pkt_in_len: usize, @@ -389,20 +395,20 @@ impl Endpoint { local: SocketAddr, remote: SocketAddr, ) -> Result<()> { - // Select padding length - const MIN_PADDING_LEN: usize = 5; - const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + crate::MAX_CID_LEN; - let padding_len = { - let max_padding_len = match pkt_in_len.checked_sub(crate::RESET_TOKEN_LEN) { - Some(v) if v > MIN_PADDING_LEN => v - 1, - _ => return Ok(()), - }; - if max_padding_len <= IDEAL_MIN_PADDING_LEN { - max_padding_len - } else { - rand::thread_rng().gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len) - } - }; + // Endpoints MUST discard packets that are too small to be valid QUIC packets. + // The smallest possible valid 1rtt packet is: 1 byte of header + cid_len + + // 1 byte of packet number + 1 byte of payload + 16 bytes of AEAD overhead. + if pkt_in_len < self.cid_gen.cid_len() + 19 { + return Ok(()); + } + + // Generate a stateless reset that is as short as possible, but long enough + // to be difficult to distinguish from one rtt packets. + let pkt_out_len = pkt_in_len - 1; + if pkt_out_len < crate::MIN_RESET_PACKET_LEN { + return Ok(()); + } + let pkt_out_len = cmp::min(pkt_out_len, crate::MAX_RESET_PACKET_LEN); // Generate stateless reset token based on the dcid. let key = &self.config.reset_token_key; @@ -410,7 +416,7 @@ impl Endpoint { // Write a Stateless Reset packet. let mut pkt_out = self.packets.get_buffer(); - let len = packet::stateless_reset(padding_len, &reset_token, &mut pkt_out[..])?; + let len = packet::stateless_reset(pkt_out_len, &reset_token, &mut pkt_out[..])?; pkt_out.truncate(len); let pkt_info = PacketInfo { @@ -419,7 +425,13 @@ impl Endpoint { time: Instant::now(), }; - trace!("{} send stateless reset {:?}", &self.trace_id, pkt_info); + trace!( + "{} send Stateless Reset {:?} token={:?} dcid_pkt_in={:?}", + &self.trace_id, + pkt_info, + reset_token, + dcid, + ); self.packets.add_packet(pkt_out, pkt_info); Ok(()) } @@ -529,6 +541,8 @@ impl Endpoint { Event::DcidRetired(token) => self.routes.remove_with_token(&token), + Event::ResetTokenAdvertised(token) => self.routes.insert_with_token(token, idx), + Event::StreamCreated(stream_id) => self.handler.on_stream_created(conn, stream_id), Event::StreamClosed(stream_id) => { @@ -837,7 +851,8 @@ impl ConnectionRoutes { } /// Find the target connection for the incoming datagram. - fn find(&self, dcid: &ConnectionId, buf: &mut [u8], info: &PacketInfo) -> Option<&u64> { + fn find(&self, dcid: &ConnectionId, buf: &mut [u8], info: &PacketInfo) -> (Option<&u64>, bool) { + let mut reset = false; let mut idx = if dcid.len() > 0 { // The packet has a non-zero-length Destination Connection ID // corresponding to an existing connection. @@ -858,11 +873,17 @@ impl ConnectionRoutes { // The endpoint identifies a received datagram as a Stateless Reset // by comparing the last 16 bytes of the datagram with all stateless // reset tokens. - let token = ResetToken::from_bytes(buf).ok()?; + let token = match ResetToken::from_bytes(buf) { + Ok(t) => t, + Err(_) => return (None, false), + }; idx = self.token_table.get(&token); + if idx.is_some() { + reset = true; + } } - idx + (idx, reset) } /// Insert the local cid and the connection. @@ -1502,6 +1523,17 @@ mod tests { packets: RefCell::new(Vec::new()), } } + + // Delivery the outgoing packets to the target endpoint + fn transfer(&self, e: &mut Endpoint) -> Result<usize> { + let count = self.packets.borrow().len(); + let mut packets = self.packets.borrow_mut(); + for (pkt, info) in packets.iter_mut() { + e.recv(pkt, info)?; + } + packets.clear(); + Ok(count) + } } impl PacketSendHandler for MockSocket { @@ -2133,92 +2165,122 @@ mod tests { Ok(()) } - fn endpoint_process_stateless_reset( - sock: Rc<MockSocket>, - packet_unknown: &mut [u8], - is_server: bool, - enable: bool, - ) -> Result<()> { - let mut conf = TestPair::new_test_config(is_server)?; - conf.enable_stateless_reset(enable); - let mut e = Endpoint::new( - Box::new(conf), - is_server, - Box::new(ServerHandler::new( - CaseConf::default(), - Arc::new(AtomicBool::new(false)), - )), - sock.clone(), - ); - let info = TestTool::new_test_packet_info(is_server); - - // Endpoint recv an unknown packet. - e.recv(packet_unknown, &info)?; - e.process_connections()?; - Ok(()) - } - #[test] - fn endpoint_stateless_reset_by_client() -> Result<()> { - let mut packet_unknown = TEST_INITIAL.clone(); - let sock = Rc::new(MockSocket::new()); - - // Client recv an Initial with unknown dcid - endpoint_process_stateless_reset(sock.clone(), &mut packet_unknown, false, true)?; - - // Client send stateless reset - let packets = sock.packets.borrow(); - assert!(packets.len() > 0); - let (packet, _) = &packets[0]; - let (hdr, _) = PacketHeader::from_bytes(&packet, 8)?; - assert_eq!(hdr.pkt_type, PacketType::OneRTT); - - Ok(()) - } - - #[test] - fn endpoint_stateless_reset_by_server() -> Result<()> { - let mut packet_stateless = TEST_STATELESS_RESET.clone(); - let sock = Rc::new(MockSocket::new()); - - // Server recv a stateless packet with unknown token. - endpoint_process_stateless_reset(sock.clone(), &mut packet_stateless, true, true)?; - - // Server send stateless reset - let packets = sock.packets.borrow(); - assert!(packets.len() > 0); + fn endpoint_stateless_reset_for_restart() -> Result<()> { + let new_endpoint = |is_server, conf, sock: Rc<MockSocket>| -> Endpoint { + Endpoint::new( + Box::new(conf), + is_server, + Box::new(ClientHandler::new( + CaseConf::default(), + Arc::new(AtomicBool::new(false)), + )), + sock.clone(), + ) + }; - let (packet, _) = &packets[0]; - let (hdr, _) = PacketHeader::from_bytes(&packet, 8)?; - assert_eq!(hdr.pkt_type, PacketType::OneRTT); - Ok(()) - } + // client endpoint + let mut client_conf = TestPair::new_test_config(false)?; + client_conf.enable_stateless_reset(true); + let client_sock = Rc::new(MockSocket::new()); + let mut client = new_endpoint(false, client_conf, client_sock.clone()); - #[test] - fn endpoint_stateless_reset_client_disabled() -> Result<()> { - let mut packet_stateless = TEST_STATELESS_RESET.clone(); - let sock = Rc::new(MockSocket::new()); + // server endpoint + let mut server_conf = TestPair::new_test_config(true)?; + server_conf.enable_stateless_reset(true); + server_conf.set_reset_token_key([1; 64]); + let server_sock = Rc::new(MockSocket::new()); + let mut server = new_endpoint(true, server_conf, server_sock.clone()); - // Client recv a stateless packet with unknown token. - endpoint_process_stateless_reset(sock.clone(), &mut packet_stateless, false, false)?; + // create a connection + let cli_addr: SocketAddr = "127.8.8.8:8888".parse().unwrap(); + let srv_addr: SocketAddr = "127.8.8.8:8443".parse().unwrap(); + let host = Some("example.org"); + let cli_conn = client.connect(cli_addr, srv_addr, host, None, None)?; + client.process_connections()?; + assert!(client_sock.transfer(&mut server)? > 0); + server.process_connections()?; + assert!(server_sock.transfer(&mut client)? > 0); + assert_eq!(client.conns.len(), 1); + assert_eq!(server.conns.len(), 1); + + // Fake restarting server after handshake + client.process_connections()?; + assert!(client_sock.transfer(&mut server)? > 0); + server.process_connections()?; + assert!(server_sock.transfer(&mut client)? > 0); + server.close(true); + + let mut server_conf = TestPair::new_test_config(true)?; + server_conf.enable_stateless_reset(true); + server_conf.set_reset_token_key([1; 64]); + let server_sock = Rc::new(MockSocket::new()); + let mut server = new_endpoint(true, server_conf, server_sock.clone()); + assert_eq!(client.conns.len(), 1); + assert_eq!(server.conns.len(), 0); + + // Client send packets to server + client.process_connections()?; + assert!(client_sock.transfer(&mut server)? > 0); + + // Server send Stateless Reset + server.process_connections()?; + assert!(server_sock.transfer(&mut client)? > 0); + + // Client detect Stateless Reset + client.process_connections()?; + let cli_conn = client.conn_get_mut(cli_conn).unwrap(); + assert!(cli_conn.is_reset()); - // Do nothing with stateless reset disabled. - let packets = sock.packets.borrow(); - assert!(packets.len() == 0); Ok(()) } #[test] - fn endpoint_stateless_reset_server_disabled() -> Result<()> { - let mut packet_stateless = TEST_STATELESS_RESET.clone(); - let sock = Rc::new(MockSocket::new()); + fn endpoint_stateless_reset_for_unknown_packet() -> Result<()> { + let cases = vec![ + // is_server, enable_reset, packet, got_reset + (false, true, Vec::from(TEST_INITIAL), true), + (false, false, Vec::from(TEST_INITIAL), false), + (false, true, Vec::from(TEST_STATELESS_RESET), true), + (false, false, Vec::from(TEST_STATELESS_RESET), false), + (true, true, Vec::from(TEST_INITIAL), false), + (true, false, Vec::from(TEST_INITIAL), false), + (true, true, Vec::from(TEST_STATELESS_RESET), true), + (true, false, Vec::from(TEST_STATELESS_RESET), false), + ]; + + for (is_server, enable_reset, pkt, got_reset) in cases { + let mut conf = TestPair::new_test_config(is_server)?; + conf.enable_stateless_reset(enable_reset); + let sock = Rc::new(MockSocket::new()); + let mut e = Endpoint::new( + Box::new(conf), + is_server, + Box::new(ServerHandler::new( + CaseConf::default(), + Arc::new(AtomicBool::new(false)), + )), + sock.clone(), + ); - // Server recv a stateless packet with unknown token. - endpoint_process_stateless_reset(sock.clone(), &mut packet_stateless, true, false)?; + // Endpoint recv an unknown packet. + let mut pkt_unknown = pkt.clone(); + let info = TestTool::new_test_packet_info(is_server); + e.recv(&mut pkt_unknown, &info)?; + + // Endpoint send stateless reset + e.process_connections()?; + let packets = sock.packets.borrow(); + if got_reset { + assert!(packets.len() > 0); + let (packet, _) = &packets[0]; + let (hdr, _) = PacketHeader::from_bytes(&packet, 8)?; + assert_eq!(hdr.pkt_type, PacketType::OneRTT); + } else { + assert!(packets.len() == 0); + } + } - // Do nothing with stateless reset disabled. - let packets = sock.packets.borrow(); - assert!(packets.len() == 0); Ok(()) } diff --git a/src/ffi.rs b/src/ffi.rs index 1e1c5a591..a986fe53d 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -215,12 +215,45 @@ pub extern "C" fn quic_config_set_initial_rtt(config: &mut Config, v: u64) { config.set_initial_rtt(v); } +/// Set the linear factor for calculating the probe timeout. +/// The endpoint do not backoff the first `v` consecutive probe timeouts. +/// The default value is `0`. +/// The configuration should be changed with caution. Setting a value greater than the default +/// will cause retransmission to be more aggressive. +#[no_mangle] +pub extern "C" fn quic_config_set_pto_linear_factor(config: &mut Config, v: u64) { + config.set_pto_linear_factor(v); +} + +/// Set the upper limit of probe timeout in milliseconds. +/// A Probe Timeout (PTO) triggers the sending of one or two probe datagrams and enables a +/// connection to recover from loss of tail packets or acknowledgments. +/// See RFC 9002 Section 6.2. +#[no_mangle] +pub extern "C" fn quic_config_set_max_pto(config: &mut Config, v: u64) { + config.set_max_pto(v); +} + /// Set the `active_connection_id_limit` transport parameter. #[no_mangle] pub extern "C" fn quic_config_set_active_connection_id_limit(config: &mut Config, v: u64) { config.set_active_connection_id_limit(v); } +/// Set the `enable_multipath` transport parameter. +/// The default value is false. (Experimental) +#[no_mangle] +pub extern "C" fn quic_config_enable_multipath(config: &mut Config, enabled: bool) { + config.enable_multipath(enabled); +} + +/// Set the multipath scheduling algorithm +/// The default value is MultipathAlgorithm::MinRtt +#[no_mangle] +pub extern "C" fn quic_config_set_multipath_algorithm(config: &mut Config, v: MultipathAlgorithm) { + config.set_multipath_algorithm(v); +} + /// Set the maximum size of the connection flow control window. #[no_mangle] pub extern "C" fn quic_config_set_max_connection_window(config: &mut Config, v: u64) { @@ -726,6 +759,12 @@ pub extern "C" fn quic_conn_is_idle_timeout(conn: &mut Connection) -> bool { conn.is_idle_timeout() } +/// Check whether the connection was closed due to stateless reset. +#[no_mangle] +pub extern "C" fn quic_conn_is_reset(conn: &mut Connection) -> bool { + conn.is_reset() +} + /// Returns the error from the peer, if any. #[no_mangle] pub extern "C" fn quic_conn_peer_error( @@ -1651,30 +1690,17 @@ fn headers_from_ptr<'a>(ptr: *const Header, len: size_t) -> Vec<h3::HeaderRef<'a /// `cb` is a callback function that will be called for each log message. /// `line` is a null-terminated log message and `argp` is user-defined data that will be passed to /// the callback. -/// `level` is the log level filter, valid values are "OFF", "ERROR", "WARN", "INFO", "DEBUG", "TRACE". +/// `level` represents the log level. #[no_mangle] pub extern "C" fn quic_set_logger( cb: extern "C" fn(line: *const u8, argp: *mut c_void), argp: *mut c_void, - level: *const c_char, -) -> c_int { + level: log::LevelFilter, +) { let argp = atomic::AtomicPtr::new(argp); let logger = Box::new(Logger { cb, argp }); - - if log::set_boxed_logger(logger).is_err() { - return -1; - } - - if level.is_null() { - return -1; - } - - let level = unsafe { ffi::CStr::from_ptr(level).to_str().unwrap_or_default() }; - if let Ok(level_filter) = log::LevelFilter::from_str(level) { - log::set_max_level(level_filter); - } - - 0 + let _ = log::set_boxed_logger(logger); + log::set_max_level(level); } #[repr(C)] diff --git a/src/frame.rs b/src/frame.rs index 61d9d34a9..310b5069f 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -34,8 +34,8 @@ use crate::Result; pub(crate) const MAX_STREAM_SIZE: u64 = 1 << 62; pub(crate) const MAX_CRYPTO_OVERHEAD: usize = 8; -pub(crate) const MAX_STREAM_OVERHEAD: usize = 12; -pub(crate) const MIN_STREAM_OVERHEAD: usize = 5; +// Type (1) + Stream ID (8) + Offset (8) + Length (2) +pub(crate) const MAX_STREAM_OVERHEAD: usize = 19; /// The QUIC frame is a unit of structured protocol information. Frames are /// contained in QUIC packets. diff --git a/src/lib.rs b/src/lib.rs index 51db65852..2b0feb20a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,7 +43,7 @@ //! //! ## Feature flags //! -//! TQUIC defines several [feature flags] to reduce the amount of compiled code +//! TQUIC defines several feature flags to reduce the amount of compiled code //! and dependencies: //! //! * `ffi`: Build and expose the FFI API. @@ -90,9 +90,16 @@ const MAX_CID_LIMIT: u64 = 8; /// The Stateless Reset Token is a 16-byte value. const RESET_TOKEN_LEN: usize = 16; -/// The minimum length of a Stateless Reset Packet. +/// For the Stateless Reset to appear as a valid QUIC packet, the Unpredictable +/// Bits field needs to include at least 38 bits of data. The minimum length of +/// a Statless Reset Packet is 21 bytes. const MIN_RESET_PACKET_LEN: usize = 21; +/// Assuming the maximum possible connection ID and packet number size, the 1RTT +/// packet size is: +/// 1 (header) + 20 (cid) + 4 (pkt num) + 1 (payload) + 16 (AEAD tag) = 42 bytes +const MAX_RESET_PACKET_LEN: usize = 42; + /// The encoded size of length field in long header. const LENGTH_FIELD_LEN: usize = 2; @@ -135,6 +142,12 @@ const INITIAL_RTT: Duration = Duration::from_millis(333); /// Default handshake timeout is 30 seconds. const DEFAULT_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30); +/// Default linear factor for calculating the probe timeout. +const DEFAULT_PTO_LINEAR_FACTOR: u64 = 0; + +/// Default upper limit of probe timeout. +const MAX_PTO: Duration = Duration::MAX; + /// Result type for quic operations. pub type Result<T> = std::result::Result<T, Error>; @@ -206,8 +219,7 @@ pub trait ConnectionIdGenerator { fn generate_cid_and_token(&mut self, reset_token_key: &hmac::Key) -> (ConnectionId, u128) { let scid = self.generate(); let reset_token = ResetToken::generate(reset_token_key, &scid); - let reset_token = u128::from_be_bytes(reset_token.0); - (scid, reset_token) + (scid, reset_token.to_u128()) } } @@ -469,7 +481,7 @@ impl Config { } /// Set congestion control algorithm that the connection would use. - /// The default value is Cubic. + /// The default value is Bbr. pub fn set_congestion_control_algorithm(&mut self, cca: CongestionControlAlgorithm) { self.recovery.congestion_control_algorithm = cca; } @@ -490,8 +502,25 @@ impl Config { /// /// The configuration should be changed with caution. Setting a value less than the default /// will cause retransmission of handshake packets to be more aggressive. - pub fn set_initial_rtt(&mut self, millisecs: u64) { - self.recovery.initial_rtt = cmp::max(Duration::from_millis(millisecs), TIMER_GRANULARITY); + pub fn set_initial_rtt(&mut self, millis: u64) { + self.recovery.initial_rtt = cmp::max(Duration::from_millis(millis), TIMER_GRANULARITY); + } + + /// Set the linear factor for calculating the probe timeout. + /// The endpoint do not backoff the first `v` consecutive probe timeouts. + /// The default value is `0`. + /// The configuration should be changed with caution. Setting a value greater than the default + /// will cause retransmission to be more aggressive. + pub fn set_pto_linear_factor(&mut self, v: u64) { + self.recovery.pto_linear_factor = v; + } + + /// Set the upper limit of probe timeout in milliseconds. + /// A Probe Timeout (PTO) triggers the sending of one or two probe datagrams and enables a + /// connection to recover from loss of tail packets or acknowledgments. + /// See RFC 9002 Section 6.2. + pub fn set_max_pto(&mut self, millis: u64) { + self.recovery.max_pto = cmp::max(Duration::from_millis(millis), TIMER_GRANULARITY); } /// Set the `active_connection_id_limit` transport parameter. @@ -504,14 +533,14 @@ impl Config { /// Set the `enable_multipath` transport parameter. /// The default value is false. (Experimental) - pub fn set_multipath(&mut self, v: bool) { + pub fn enable_multipath(&mut self, v: bool) { self.local_transport_params.enable_multipath = v; } /// Set the multipath scheduling algorithm /// The default value is MultipathAlgorithm::MinRtt - pub fn set_multipath_algor(&mut self, v: MultipathAlgorithm) { - self.multipath.multipath_algor = v; + pub fn set_multipath_algorithm(&mut self, v: MultipathAlgorithm) { + self.multipath.multipath_algorithm = v; } /// Set the maximum size of the connection flow control window. @@ -532,15 +561,20 @@ impl Config { self.max_concurrent_conns = v; } + /// Set whether stateless reset is allowed. + pub fn enable_stateless_reset(&mut self, enable_stateless_reset: bool) { + self.stateless_reset = enable_stateless_reset; + } + /// Set the key for reset token generation pub fn set_reset_token_key(&mut self, v: [u8; 64]) { // HMAC-SHA256 use a 512-bit block length self.reset_token_key = hmac::Key::new(hmac::HMAC_SHA256, &v); } - /// Set the lifetime of address token - pub fn set_address_token_lifetime(&mut self, seconds: u64) { - self.address_token_lifetime = Duration::from_secs(seconds); + /// Set whether stateless retry is allowed. Default is not allowed. + pub fn enable_retry(&mut self, enable_retry: bool) { + self.retry = enable_retry; } /// Set the key for address token generation. @@ -561,14 +595,9 @@ impl Config { Ok(()) } - /// Set whether stateless retry is allowed. Default is not allowed. - pub fn enable_retry(&mut self, enable_retry: bool) { - self.retry = enable_retry; - } - - /// Set whether stateless reset is allowed. - pub fn enable_stateless_reset(&mut self, enable_stateless_reset: bool) { - self.stateless_reset = enable_stateless_reset; + /// Set the lifetime of address token + pub fn set_address_token_lifetime(&mut self, seconds: u64) { + self.address_token_lifetime = Duration::from_secs(seconds); } /// Set the length of source cid. @@ -645,6 +674,12 @@ pub struct RecoveryConfig { /// The initial rtt, used before real rtt is estimated. pub initial_rtt: Duration, + + /// Linear factor for calculating the probe timeout. + pub pto_linear_factor: u64, + + // Upper limit of probe timeout. + pub max_pto: Duration, } impl Default for RecoveryConfig { @@ -652,10 +687,12 @@ impl Default for RecoveryConfig { RecoveryConfig { max_datagram_size: 1200, max_ack_delay: time::Duration::from_millis(0), - congestion_control_algorithm: CongestionControlAlgorithm::Cubic, + congestion_control_algorithm: CongestionControlAlgorithm::Bbr, min_congestion_window: 2_u64, initial_congestion_window: 10_u64, initial_rtt: INITIAL_RTT, + pto_linear_factor: DEFAULT_PTO_LINEAR_FACTOR, + max_pto: MAX_PTO, } } } @@ -665,13 +702,13 @@ impl Default for RecoveryConfig { #[derive(Debug, Clone)] pub struct MultipathConfig { /// Multipath scheduling algorithm. - multipath_algor: MultipathAlgorithm, + multipath_algorithm: MultipathAlgorithm, } impl Default for MultipathConfig { fn default() -> MultipathConfig { MultipathConfig { - multipath_algor: MultipathAlgorithm::MinRtt, + multipath_algorithm: MultipathAlgorithm::MinRtt, } } } @@ -696,6 +733,10 @@ enum Event { /// The connection has send a RETIRE_CONNECTION_ID frame. DcidRetired(ResetToken), + /// The client connection has received a stateless reset token from transport + /// parameters extension. + ResetTokenAdvertised(ResetToken), + /// The stream is created. StreamCreated(u64), @@ -864,6 +905,34 @@ mod tests { Ok(()) } + + #[test] + fn pto_linear_factor() -> Result<()> { + let mut config = Config::new()?; + assert_eq!(config.recovery.pto_linear_factor, DEFAULT_PTO_LINEAR_FACTOR); + + config.set_pto_linear_factor(0); + assert_eq!(config.recovery.pto_linear_factor, 0); + + config.set_pto_linear_factor(100); + assert_eq!(config.recovery.pto_linear_factor, 100); + + Ok(()) + } + + #[test] + fn max_pto() -> Result<()> { + let mut config = Config::new()?; + assert_eq!(config.recovery.max_pto, MAX_PTO); + + config.set_max_pto(0); + assert_eq!(config.recovery.max_pto, TIMER_GRANULARITY); + + config.set_max_pto(300000); + assert_eq!(config.recovery.max_pto, Duration::from_millis(300000)); + + Ok(()) + } } pub use crate::congestion_control::CongestionControlAlgorithm; diff --git a/src/multipath_scheduler/multipath_scheduler.rs b/src/multipath_scheduler/multipath_scheduler.rs index cd076792d..6f2faf8d3 100644 --- a/src/multipath_scheduler/multipath_scheduler.rs +++ b/src/multipath_scheduler/multipath_scheduler.rs @@ -19,6 +19,7 @@ use std::time::Instant; use self::scheduler_minrtt::*; use self::scheduler_redundant::*; +use self::scheduler_rr::*; use crate::connection::path::PathMap; use crate::connection::space::PacketNumSpaceMap; use crate::connection::space::SentPacket; @@ -54,7 +55,8 @@ pub(crate) trait MultipathScheduler { } } -/// Available multipath scheduling algorithm +/// Available multipath scheduling algorithms. +#[repr(C)] #[derive(Debug, Clone, Copy, PartialEq)] pub enum MultipathAlgorithm { /// The scheduler sends packets over the path with the lowest smoothed RTT @@ -71,6 +73,12 @@ pub enum MultipathAlgorithm { /// present, it ensures a goodput at least equivalent to the best single /// path. Redundant, + + /// The scheduler sends packets over available paths in a round robin + /// manner. It aims to fully utilize the capacity of each path as the + /// distribution across all path is equal. It is only used for testing + /// purposes. + RoundRobin, } impl FromStr for MultipathAlgorithm { @@ -81,6 +89,8 @@ impl FromStr for MultipathAlgorithm { Ok(MultipathAlgorithm::MinRtt) } else if algor.eq_ignore_ascii_case("redundant") { Ok(MultipathAlgorithm::Redundant) + } else if algor.eq_ignore_ascii_case("roundrobin") { + Ok(MultipathAlgorithm::RoundRobin) } else { Err(Error::InvalidConfig("unknown".into())) } @@ -89,9 +99,10 @@ impl FromStr for MultipathAlgorithm { /// Build a multipath scheduler pub(crate) fn build_multipath_scheduler(conf: &MultipathConfig) -> Box<dyn MultipathScheduler> { - match conf.multipath_algor { + match conf.multipath_algorithm { MultipathAlgorithm::MinRtt => Box::new(MinRttScheduler::new(conf)), MultipathAlgorithm::Redundant => Box::new(RedundantScheduler::new(conf)), + MultipathAlgorithm::RoundRobin => Box::new(RoundRobinScheduler::new(conf)), } } @@ -99,6 +110,7 @@ pub(crate) fn reinjection_required(algor: MultipathAlgorithm) -> bool { match algor { MultipathAlgorithm::MinRtt => false, MultipathAlgorithm::Redundant => true, + MultipathAlgorithm::RoundRobin => false, } } @@ -176,6 +188,10 @@ pub(crate) mod tests { ("redundant", Ok(MultipathAlgorithm::Redundant)), ("Redundant", Ok(MultipathAlgorithm::Redundant)), ("REDUNDANT", Ok(MultipathAlgorithm::Redundant)), + ("roundrobin", Ok(MultipathAlgorithm::RoundRobin)), + ("Roundrobin", Ok(MultipathAlgorithm::RoundRobin)), + ("RoundRobin", Ok(MultipathAlgorithm::RoundRobin)), + ("ROUNDROBIN", Ok(MultipathAlgorithm::RoundRobin)), ("redun", Err(Error::InvalidConfig("unknown".into()))), ]; @@ -187,3 +203,4 @@ pub(crate) mod tests { mod scheduler_minrtt; mod scheduler_redundant; +mod scheduler_rr; diff --git a/src/multipath_scheduler/scheduler_rr.rs b/src/multipath_scheduler/scheduler_rr.rs new file mode 100644 index 000000000..210719d04 --- /dev/null +++ b/src/multipath_scheduler/scheduler_rr.rs @@ -0,0 +1,153 @@ +// Copyright (c) 2023 The TQUIC Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::connection::path::PathMap; +use crate::connection::space::PacketNumSpaceMap; +use crate::connection::stream::StreamMap; +use crate::multipath_scheduler::MultipathScheduler; +use crate::Error; +use crate::MultipathConfig; +use crate::Path; +use crate::Result; + +/// RoundRobinScheduler iterates over the available paths and select the next +/// one whose congestion window is open. +/// +/// The simple scheduler aims to guarantee that the capacity of each path is +/// fully utilized as the distribution across all path is equal. It is for +/// testing purposes only. +pub struct RoundRobinScheduler { + last: Option<usize>, +} + +impl RoundRobinScheduler { + pub fn new(_conf: &MultipathConfig) -> RoundRobinScheduler { + RoundRobinScheduler { last: None } + } +} + +impl RoundRobinScheduler { + /// Iterate and find the last used path + fn find_last(&self, iter: &mut slab::Iter<Path>, last: usize) -> bool { + for (pid, _) in iter.by_ref() { + if pid != last { + continue; + } + return true; + } + false + } + + /// Try to select an available path + fn select(&mut self, iter: &mut slab::Iter<Path>) -> Option<usize> { + for (pid, path) in iter.by_ref() { + // Skip the path that is not ready for sending non-probing packets. + if !path.active() || !path.recovery.can_send() { + continue; + } + + self.last = Some(pid); + return Some(pid); + } + None + } +} + +impl MultipathScheduler for RoundRobinScheduler { + /// Select the next path with sufficient congestion window. + fn on_select( + &mut self, + paths: &mut PathMap, + spaces: &mut PacketNumSpaceMap, + streams: &mut StreamMap, + ) -> Result<usize> { + let mut iter = paths.iter(); + let mut exist_last = false; + + // Iterate and find the last used path + if let Some(last) = self.last { + if self.find_last(&mut iter, last) { + exist_last = true; + } else { + // The last path has been abandoned + iter = paths.iter(); + } + } + + // Find the next available path + if let Some(pid) = self.select(&mut iter) { + return Ok(pid); + } + if !exist_last { + return Err(Error::Done); + } + + let mut iter = paths.iter(); + if let Some(pid) = self.select(&mut iter) { + return Ok(pid); + } + Err(Error::Done) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::multipath_scheduler::tests::*; + + #[test] + fn round_robin_single_available_path() -> Result<()> { + let mut t = MultipathTester::new()?; + + let mut s = RoundRobinScheduler::new(&MultipathConfig::default()); + assert_eq!(s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams)?, 0); + assert_eq!(s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams)?, 0); + Ok(()) + } + + #[test] + fn round_robin_multi_available_path() -> Result<()> { + let mut t = MultipathTester::new()?; + t.add_path("127.0.0.1:443", "127.0.0.2:8443", 50)?; + t.add_path("127.0.0.1:443", "127.0.0.3:8443", 150)?; + t.add_path("127.0.0.1:443", "127.0.0.4:8443", 100)?; + + let mut s = RoundRobinScheduler::new(&MultipathConfig::default()); + assert_eq!(s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams)?, 0); + assert_eq!(s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams)?, 1); + assert_eq!(s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams)?, 2); + assert_eq!(s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams)?, 3); + + t.set_path_active(1, false)?; + assert_eq!(s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams)?, 0); + assert_eq!(s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams)?, 2); + + t.set_path_active(3, false)?; + assert_eq!(s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams)?, 0); + Ok(()) + } + + #[test] + fn round_robin_no_available_path() -> Result<()> { + let mut t = MultipathTester::new()?; + t.set_path_active(0, false)?; + + let mut s = RoundRobinScheduler::new(&MultipathConfig::default()); + assert_eq!( + s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams), + Err(Error::Done) + ); + Ok(()) + } +} diff --git a/src/packet.rs b/src/packet.rs index 6e3019e6e..a3a403970 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -739,23 +739,39 @@ pub fn verify_retry_integrity_tag(buf: &mut [u8], odcid: &[u8], version: u32) -> } /// Encode a Stateless Reset packet to the given buffer -pub fn stateless_reset(padding_len: usize, token: &[u8], mut out: &mut [u8]) -> Result<usize> { - let len = 1 + padding_len + token.len(); - if out.len() < len { +/// +/// The `pkt_len` is the length of Stateless Reset packet. +/// The `token` is the Stateless Reset token. +pub fn stateless_reset(pkt_len: usize, token: &[u8], mut out: &mut [u8]) -> Result<usize> { + if pkt_len > out.len() { return Err(Error::BufferTooShort); } + if pkt_len < crate::MIN_RESET_PACKET_LEN { + return Err(Error::InternalError); + } if token.len() != crate::RESET_TOKEN_LEN { return Err(Error::InternalError); } - let first = 0b0100_0000; - out.write_u8(first)?; + // The layout of Stateless Reset packet: + // + // Stateless Reset { + // Fixed Bits (2) = 1, + // Unpredictable Bits (38..), + // Stateless Reset Token (128), + // } + + // Write the Unpredictable Bits + let unpredict_len = pkt_len - crate::RESET_TOKEN_LEN; + rand::thread_rng().fill_bytes(&mut out[..unpredict_len]); - rand::thread_rng().fill_bytes(&mut out[..padding_len]); - out = &mut out[padding_len..]; + // Set the 2 fixed bits + out[0] = (out[0] & 0b0011_1111) | HEADER_FIXED_BIT; + // Write the Stateless Reset Token + out = &mut out[unpredict_len..]; out.write(token)?; - Ok(len) + Ok(pkt_len) } #[cfg(test)] @@ -1009,6 +1025,10 @@ mod tests { stateless_reset(64, &token, &mut buf[..10]), Err(Error::BufferTooShort) ); + assert_eq!( + stateless_reset(16, &token, &mut buf), + Err(Error::InternalError) + ); assert_eq!( stateless_reset(64, &token[..10], &mut buf), Err(Error::InternalError) @@ -1016,7 +1036,7 @@ mod tests { let len = stateless_reset(64, &token, &mut buf)?; let buf = &buf[..len]; - assert_eq!(buf[0], 0b0100_0000); + assert_eq!(buf[0] & 0b1100_0000, 0b0100_0000); // The 2 fixed bytes is 01 assert_eq!(ResetToken::from_bytes(buf)?.0, token); Ok(()) diff --git a/src/tls/tls.rs b/src/tls/tls.rs index f6c4f4b85..078cf8a4e 100644 --- a/src/tls/tls.rs +++ b/src/tls/tls.rs @@ -236,7 +236,6 @@ pub struct TlsSessionData { error: Option<TlsError>, trace_id: String, write_method: Option<WriteMethod>, - // TODO: drop conf_selector after TlsConfigSelector::find() is called. conf_selector: Option<Arc<dyn TlsConfigSelector>>, early_data_rejected: bool, } @@ -325,7 +324,12 @@ impl TlsSession { return self.session.process_post_handshake(&mut self.data); } - self.session.do_handshake(&mut self.data) + self.session.do_handshake(&mut self.data)?; + if self.session.is_completed() { + self.data.conf_selector = None; + } + + Ok(()) } /// Reset tls session state. @@ -1039,6 +1043,8 @@ pub(crate) mod tests { let conf_selector = Arc::new(ServerConfigSelector::new()?); for i in 0..conf_selector.len() { + assert_eq!(Arc::strong_count(&conf_selector), 1); + let server_name = i.to_string(); let tls_session_pair = handshake_with_multi_cert(conf_selector.clone(), Some(&server_name))?; @@ -1049,6 +1055,7 @@ pub(crate) mod tests { i + 1 ); assert!(tls_session_pair.server.is_completed()); + assert_eq!(Arc::strong_count(&conf_selector), 1); } Ok(()) diff --git a/src/token.rs b/src/token.rs index bf4f12d30..5aa53f037 100644 --- a/src/token.rs +++ b/src/token.rs @@ -324,6 +324,16 @@ impl ResetToken { token.0.copy_from_slice(&buf[buf.len() - RESET_TOKEN_LEN..]); Ok(token) } + + /// Encode a Reset Token to a 128 bit integer + pub(crate) fn to_u128(self) -> u128 { + u128::from_be_bytes(self.0) + } + + /// Decode a Reset Token from a 128 bit integer + pub(crate) fn from_u128(v: u128) -> ResetToken { + ResetToken(v.to_be_bytes()) + } } impl std::ops::Deref for ResetToken { @@ -531,6 +541,10 @@ mod tests { assert_eq!(ResetToken::new(&buf), Err(Error::BufferTooShort)); assert_eq!(ResetToken::from_bytes(&buf), Err(Error::BufferTooShort)); + let token = ResetToken::generate(&key, &c1); + assert_eq!(ResetToken::from_u128(token.to_u128()), token); + assert_eq!(token.to_u128().to_be_bytes(), token.0); + Ok(()) } } diff --git a/tools/Cargo.toml b/tools/Cargo.toml index 685e10c05..ce98d07ed 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tquic_tools" -version = "0.4.0" +version = "0.5.0" edition = "2021" rust-version = "1.70.0" license = "Apache-2.0" @@ -22,6 +22,7 @@ slab = "0.4" rand = "0.8.5" statrs = "0.16" tikv-jemallocator = "0.5" +signal-hook = "0.3.17" tquic = { path = "..", version = "0.4.0"} [lib] diff --git a/tools/src/bin/tquic_client.rs b/tools/src/bin/tquic_client.rs index 0e005604c..35795985c 100644 --- a/tools/src/bin/tquic_client.rs +++ b/tools/src/bin/tquic_client.rs @@ -25,6 +25,8 @@ use std::net::Ipv6Addr; use std::net::SocketAddr; use std::net::ToSocketAddrs; use std::rc::Rc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; use std::thread; @@ -36,6 +38,7 @@ use clap::CommandFactory; use clap::Parser; use log::debug; use log::error; +use log::info; use log::warn; use mio::event::Event; use rand::Rng; @@ -62,8 +65,7 @@ use tquic::PacketInfo; use tquic::TlsConfig; use tquic::TransportHandler; use tquic::TIMER_GRANULARITY; -use tquic_tools::alpns; -use tquic_tools::AppProto; +use tquic_tools::ApplicationProto; use tquic_tools::QuicSocket; use tquic_tools::Result; @@ -102,6 +104,10 @@ pub struct ClientOpt { #[clap(short, long, default_value = "0", value_name = "TIME")] pub duration: u64, + /// Client will exit if consecutive failure reaches the threshold at the beginning. + #[clap(long, default_value = "10", value_name = "NUM")] + pub connection_failure_threshold: u64, + /// Number of max samples per thread used for request time statistics. #[clap(long, default_value = "100000", value_name = "NUM")] pub max_sample: usize, @@ -118,7 +124,7 @@ pub struct ClientOpt { #[clap(short, long, value_name = "ADDR")] pub connect_to: Option<SocketAddr>, - /// ALPN, support "http/0.9", "hq-interop" and "h3", separated by ",". + /// ALPN, separated by ",". #[clap( short, long, @@ -126,7 +132,7 @@ pub struct ClientOpt { default_value = "h3,http/0.9,hq-interop", value_name = "STR" )] - pub alpn: Vec<Vec<u8>>, + pub alpn: Vec<ApplicationProto>, /// Dump response body into the given directory. /// If the specified directory does not exist, a new directory will be created. @@ -147,7 +153,7 @@ pub struct ClientOpt { pub disable_stateless_reset: bool, /// Congestion control algorithm. - #[clap(long, default_value = "CUBIC")] + #[clap(long, default_value = "BBR")] pub congestion_control_algor: CongestionControlAlgorithm, /// Initial congestion window in packets. @@ -190,6 +196,14 @@ pub struct ClientOpt { #[clap(long, default_value = "333", value_name = "TIME")] pub initial_rtt: u64, + /// Linear factor for calculating the probe timeout. + #[clap(long, default_value = "3", value_name = "NUM")] + pub pto_linear_factor: u64, + + /// Upper limit of probe timeout in microseconds. + #[clap(long, default_value = "10000", value_name = "TIME")] + pub max_pto: u64, + /// Save TLS key log into the given file. #[clap(short, long, value_name = "FILE")] pub keylog_file: Option<String>, @@ -215,17 +229,23 @@ struct Client { /// Client start time. start_time: Instant, + + /// If terminated by system signal. + terminated: Arc<AtomicBool>, } impl Client { /// Create a new multi-threads client. pub fn new(option: ClientOpt) -> Result<Self> { let client_ctx = Arc::new(Mutex::new(ClientContext::default())); + let terminated = Arc::new(AtomicBool::new(false)); + signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&terminated))?; Ok(Self { option, context: client_ctx, start_time: Instant::now(), + terminated, }) } @@ -236,8 +256,9 @@ impl Client { for i in 0..self.option.threads { let client_opt = self.option.clone(); let client_ctx = self.context.clone(); + let terminated = self.terminated.clone(); let thread = thread::spawn(move || { - let mut worker = Worker::new(i, client_opt, client_ctx).unwrap(); + let mut worker = Worker::new(i, client_opt, client_ctx, terminated).unwrap(); worker.start().unwrap(); }); threads.push(thread); @@ -379,6 +400,9 @@ struct Worker { /// Worker end time. end_time: Option<Instant>, + + /// If terminated by system signal. + terminated: Arc<AtomicBool>, } impl Worker { @@ -387,12 +411,15 @@ impl Worker { index: u32, option: ClientOpt, client_ctx: Arc<Mutex<ClientContext>>, + terminated: Arc<AtomicBool>, ) -> Result<Self> { let mut config = Config::new()?; config.enable_stateless_reset(!option.disable_stateless_reset); config.set_max_handshake_timeout(option.handshake_timeout); config.set_max_idle_timeout(option.idle_timeout); config.set_initial_rtt(option.initial_rtt); + config.set_pto_linear_factor(option.pto_linear_factor); + config.set_max_pto(option.max_pto); config.set_max_concurrent_conns(option.max_concurrent_conns); config.set_initial_max_streams_bidi(option.max_concurrent_requests); config.set_send_batch_size(option.send_batch_size); @@ -401,10 +428,12 @@ impl Worker { config.set_congestion_control_algorithm(option.congestion_control_algor); config.set_initial_congestion_window(option.initial_congestion_window); config.set_min_congestion_window(option.min_congestion_window); - config.set_multipath(option.enable_multipath); - config.set_multipath_algor(option.multipath_algor); - let tls_config = - TlsConfig::new_client_config(option.alpn.clone(), option.enable_early_data)?; + config.enable_multipath(option.enable_multipath); + config.set_multipath_algorithm(option.multipath_algor); + let tls_config = TlsConfig::new_client_config( + ApplicationProto::convert_to_vec(&option.alpn), + option.enable_early_data, + )?; config.set_tls_config(tls_config); let poll = mio::Poll::new()?; @@ -434,6 +463,7 @@ impl Worker { recv_buf: vec![0u8; MAX_BUF_SIZE], start_time: Instant::now(), end_time: None, + terminated, }) } @@ -475,48 +505,68 @@ impl Worker { Ok(()) } - fn process(&mut self) -> Result<bool> { - // Process connections. - self.endpoint.process_connections()?; + fn should_exit(&self) -> bool { + if self.terminated.load(Ordering::Relaxed) { + info!("worker terminated by system signal and waiting for tasks to finish."); + return true; + } + let worker_ctx = self.worker_ctx.borrow(); + debug!("worker concurrent conns {}", worker_ctx.concurrent_conns); + + if !worker_ctx.connected + && worker_ctx.conn_finish_failed >= self.option.connection_failure_threshold { - let worker_ctx = self.worker_ctx.borrow(); - debug!("worker concurrent conns {}", worker_ctx.concurrent_conns); - - // Check close. - if (self.option.duration > 0 - && (Instant::now() - self.start_time).as_secs() > self.option.duration) - || (self.option.max_requests_per_thread > 0 - && worker_ctx.request_done >= self.option.max_requests_per_thread) - { - debug!( - "worker should exit, concurrent conns {}, request sent {}, request done {}", - worker_ctx.concurrent_conns, worker_ctx.request_sent, worker_ctx.request_done, - ); + error!( + "connect server[{:?}] failed", + self.option.connect_to.unwrap() + ); + return true; + } - // Close endpoint. - self.endpoint.close(false); + if (self.option.duration > 0 + && (Instant::now() - self.start_time).as_secs() > self.option.duration) + || (self.option.max_requests_per_thread > 0 + && worker_ctx.request_done >= self.option.max_requests_per_thread) + { + debug!( + "worker should exit, concurrent conns {}, request sent {}, request done {}", + worker_ctx.concurrent_conns, worker_ctx.request_sent, worker_ctx.request_done, + ); + return true; + } - // Close connections. - let mut senders = self.senders.borrow_mut(); - for (index, _) in senders.iter_mut() { - let conn = self.endpoint.conn_get_mut(*index).unwrap(); - _ = conn.close(true, 0x00, b"ok"); - } + false + } - // Update worker end time. - if self.end_time.is_none() { - debug!("all tasks finished, update the end time and wait for saving session."); - self.end_time = Some(Instant::now()); - } + fn process(&mut self) -> Result<bool> { + // Process connections. + self.endpoint.process_connections()?; - if senders.len() == 0 { - // All connections are closed. - return Ok(true); - } + // Check exit. + if self.should_exit() { + // Close endpoint. + self.endpoint.close(false); + + // Close connections. + let mut senders = self.senders.borrow_mut(); + for (index, _) in senders.iter_mut() { + let conn = self.endpoint.conn_get_mut(*index).unwrap(); + _ = conn.close(true, 0x00, b"ok"); + } + + // Update worker end time. + if self.end_time.is_none() { + debug!("all tasks finished, update the end time and wait for saving session."); + self.end_time = Some(Instant::now()); + } - return Ok(false); + if senders.len() == 0 { + // All connections are closed. + return Ok(true); } + + return Ok(false); } // Check and create new connections. @@ -636,6 +686,7 @@ struct WorkerContext { conn_finish_failed: u64, concurrent_conns: u32, conn_stats: ConnectionStats, + connected: bool, } impl WorkerContext { @@ -755,7 +806,7 @@ struct RequestSender { worker_ctx: Rc<RefCell<WorkerContext>>, /// Application protocol, http/0.9 or h3. - app_proto: AppProto, + app_proto: ApplicationProto, /// Next available stream id, used in http/0.9 mode. next_stream_id: u64, @@ -780,21 +831,15 @@ impl RequestSender { buf: vec![0; MAX_BUF_SIZE], streams: FxHashMap::default(), worker_ctx, - app_proto: AppProto::H3, + app_proto: ApplicationProto::from_slice(conn.application_proto()), next_stream_id: 0, h3_conn: None, }; - let application_proto = conn.application_proto(); - if alpns::HTTP_09.contains(&application_proto) { - sender.app_proto = AppProto::Http09; - } else if alpns::HTTP_3.contains(&application_proto) { - sender.app_proto = AppProto::H3; + if sender.app_proto == ApplicationProto::H3 { sender.h3_conn = Some( Http3Connection::new_with_quic_conn(conn, &Http3Config::new().unwrap()).unwrap(), ); - } else { - unreachable!(); } sender @@ -831,12 +876,11 @@ impl RequestSender { _ = conn.stream_want_read(stream_id, true); - if self.app_proto == AppProto::H3 { - self.recv_h3_responses(conn, stream_id) - } else if self.app_proto == AppProto::Http09 { - self.recv_http09_responses(conn, stream_id) - } else { - unreachable!(); + match self.app_proto { + ApplicationProto::Interop | ApplicationProto::Http09 => { + self.recv_http09_responses(conn, stream_id) + } + ApplicationProto::H3 => self.recv_h3_responses(conn, stream_id), } } @@ -850,12 +894,11 @@ impl RequestSender { self.current_url_idx ); - let s = if self.app_proto == AppProto::H3 { - self.send_h3_request(conn, &request)? - } else if self.app_proto == AppProto::Http09 { - self.send_http09_request(conn, &request)? - } else { - unreachable!() + let s = match self.app_proto { + ApplicationProto::Interop | ApplicationProto::Http09 => { + self.send_http09_request(conn, &request)? + } + ApplicationProto::H3 => self.send_h3_request(conn, &request)?, }; request.start_time = Some(Instant::now()); @@ -1198,6 +1241,7 @@ impl TransportHandler for WorkerHandler { { let mut worker_ctx = self.worker_ctx.borrow_mut(); worker_ctx.conn_handshake_success += 1; + worker_ctx.connected = true; } // Try to add additional paths diff --git a/tools/src/bin/tquic_server.rs b/tools/src/bin/tquic_server.rs index a11a98597..b2ddec8fc 100644 --- a/tools/src/bin/tquic_server.rs +++ b/tools/src/bin/tquic_server.rs @@ -43,8 +43,7 @@ use tquic::PacketInfo; use tquic::TlsConfig; use tquic::TransportHandler; use tquic::TIMER_GRANULARITY; -use tquic_tools::alpns; -use tquic_tools::AppProto; +use tquic_tools::ApplicationProto; use tquic_tools::QuicSocket; use tquic_tools::Result; @@ -96,7 +95,7 @@ pub struct ServerOpt { pub disable_stateless_reset: bool, /// Congestion control algorithm. - #[clap(long, default_value = "CUBIC")] + #[clap(long, default_value = "BBR")] pub congestion_control_algor: CongestionControlAlgorithm, /// Initial congestion window in packets. @@ -135,6 +134,14 @@ pub struct ServerOpt { #[clap(long, default_value = "333", value_name = "TIME")] pub initial_rtt: u64, + /// Linear factor for calculating the probe timeout. + #[clap(long, default_value = "3", value_name = "NUM")] + pub pto_linear_factor: u64, + + /// Upper limit of probe timeout in microseconds. + #[clap(long, default_value = "10000", value_name = "TIME")] + pub max_pto: u64, + /// Save TLS key log into the given file. #[clap(long, value_name = "FILE")] pub keylog_file: Option<String>, @@ -176,12 +183,14 @@ impl Server { config.set_max_handshake_timeout(option.handshake_timeout); config.set_max_idle_timeout(option.idle_timeout); config.set_initial_rtt(option.initial_rtt); + config.set_pto_linear_factor(option.pto_linear_factor); + config.set_max_pto(option.max_pto); + config.set_send_batch_size(option.send_batch_size); config.set_congestion_control_algorithm(option.congestion_control_algor); config.set_initial_congestion_window(option.initial_congestion_window); config.set_min_congestion_window(option.min_congestion_window); - config.set_send_batch_size(option.send_batch_size); - config.set_multipath(option.enable_multipath); - config.set_multipath_algor(option.multipath_algor); + config.enable_multipath(option.enable_multipath); + config.set_multipath_algorithm(option.multipath_algor); if let Some(address_token_key) = &option.address_token_key { let address_token_key = convert_address_token_key(address_token_key); @@ -270,7 +279,7 @@ struct Response { #[derive(Default)] struct ConnectionHandler { /// Application protocol. - app_proto: AppProto, + app_proto: ApplicationProto, /// File root directory. root: String, @@ -522,13 +531,12 @@ impl ConnectionHandler { } fn recv_request(&mut self, buf: &mut [u8], conn: &mut Connection, stream_id: u64) { - if self.app_proto == AppProto::H3 { - self.recv_h3_request(conn); - } else if self.app_proto == AppProto::Http09 { - self.recv_http09_request(buf, conn, stream_id); - } else { - unreachable!() - }; + match self.app_proto { + ApplicationProto::Interop | ApplicationProto::Http09 => { + self.recv_http09_request(buf, conn, stream_id) + } + ApplicationProto::H3 => self.recv_h3_request(conn), + } } fn send_http09_response(&mut self, conn: &mut Connection, stream_id: u64) { @@ -597,13 +605,12 @@ impl ConnectionHandler { _ = conn.stream_want_write(stream_id, true); - if self.app_proto == AppProto::H3 { - self.send_h3_response(conn, stream_id); - } else if self.app_proto == AppProto::Http09 { - self.send_http09_response(conn, stream_id); - } else { - unreachable!() - }; + match self.app_proto { + ApplicationProto::Interop | ApplicationProto::Http09 => { + self.send_http09_response(conn, stream_id) + } + ApplicationProto::H3 => self.send_h3_response(conn, stream_id), + } } } @@ -663,21 +670,17 @@ impl ServerHandler { debug!("{} new connection handler", conn.trace_id()); let mut conn_handler = ConnectionHandler { + app_proto: ApplicationProto::from_slice(conn.application_proto()), root: self.root.clone(), ..Default::default() }; - let app_proto = conn.application_proto(); - if alpns::HTTP_09.contains(&app_proto) { - conn_handler.app_proto = AppProto::Http09; - } else if alpns::HTTP_3.contains(&app_proto) { - conn_handler.app_proto = AppProto::H3; + + if conn_handler.app_proto == ApplicationProto::H3 { conn_handler.h3_conn = Some( Http3Connection::new_with_quic_conn(conn, &Http3Config::new().unwrap()).unwrap(), ); - } else { - error!("{} alpn unknown {:?}", conn.trace_id(), app_proto); - unreachable!(); } + self.conns.insert(index, conn_handler); } } @@ -722,7 +725,9 @@ impl TransportHandler for ServerHandler { let index = conn.index().unwrap(); let conn_handler = self.conns.get_mut(&index).unwrap(); - if conn_handler.app_proto == AppProto::Http09 { + if conn_handler.app_proto == ApplicationProto::Interop + || conn_handler.app_proto == ApplicationProto::Http09 + { conn_handler.http09_requests.insert(stream_id, b"".to_vec()); } } diff --git a/tools/src/common.rs b/tools/src/common.rs index 64e154318..229929d9a 100644 --- a/tools/src/common.rs +++ b/tools/src/common.rs @@ -18,6 +18,8 @@ use std::net::Ipv4Addr; use std::net::Ipv6Addr; use std::net::SocketAddr; +use clap::builder::PossibleValue; +use clap::ValueEnum; use log::debug; use mio::net::UdpSocket; use mio::Interest; @@ -31,19 +33,63 @@ use tquic::PacketSendHandler; pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; -/// Supported ALPNs. -pub mod alpns { - pub const HTTP_09: [&[u8]; 2] = [b"hq-interop", b"http/0.9"]; - pub const HTTP_3: [&[u8]; 1] = [b"h3"]; -} +/// Supported application protocols. +#[derive(Clone, Copy, Default, PartialEq, Debug)] +pub enum ApplicationProto { + /// Proto for QUIC interop, see https://github.com/quic-interop/quic-interop-runner + Interop, -#[derive(Default, PartialEq)] -pub enum AppProto { + /// HTTP/0.9, see https://http.dev/0.9 Http09, + + /// HTTP/3, see https://www.rfc-editor.org/rfc/rfc9114.html #[default] H3, } +impl ApplicationProto { + /// Create a new ApplicationProto from byte slice. + pub fn from_slice(proto: &[u8]) -> Self { + match proto { + b"hq-interop" => Self::Interop, + b"http/0.9" => Self::Http09, + b"h3" => Self::H3, + _ => unreachable!(), + } + } + + /// Convert an ApplicationProto into a byte slice. + pub fn to_slice(&self) -> &[u8] { + match self { + Self::Interop => b"hq-interop", + Self::Http09 => b"http/0.9", + Self::H3 => b"h3", + } + } + + /// Convert an ApplicationProto slice to a two-dimension byte vector. + pub fn convert_to_vec(protos: &[Self]) -> Vec<Vec<u8>> { + protos + .iter() + .map(|proto| proto.to_slice().to_vec()) + .collect() + } +} + +impl ValueEnum for ApplicationProto { + fn to_possible_value(&self) -> Option<PossibleValue> { + Some(match self { + Self::Interop => PossibleValue::new("hq-interop"), + Self::Http09 => PossibleValue::new("http/0.9"), + Self::H3 => PossibleValue::new("h3"), + }) + } + + fn value_variants<'a>() -> &'a [Self] { + &[Self::Interop, Self::Http09, Self::H3] + } +} + /// UDP socket wrapper for QUIC pub struct QuicSocket { /// The underlying UDP sockets for QUIC Endpoint.