diff --git a/.github/workflows/plot-benchmark.py b/.github/workflows/plot-benchmark.py new file mode 100644 index 00000000..d9391d17 --- /dev/null +++ b/.github/workflows/plot-benchmark.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 + +import os +import sys + +import datetime +import matplotlib.pyplot as plt +import numpy as np +import prettytable +import termcolor + +# Benchmark scenarios +SCENARIO = ["long", "short"] + +# QUIC implementations. +# The first element is used as the normalization base. +IMPLS = ["lsquic", "tquic"] + +# Round of benchmark in one scenario. +ROUND = 5 + +# File sizes in long connection scenario benchmark. +LONG_FILE_SIZES = ["15K", "50K", "2M"] + +# File sizes in short connection scenario benchmark. +SHORT_FILE_SIZES = ["1K"] + +# Different concurrent connections. +LONG_CONNS = [10] + +# Different concurrent connections. +SHORT_CONNS = [10] + +# Different concurrent streams. +LONG_STREAMS = [1, 10] + +# Different concurrent streams. +SHORT_STREAMS = [1] + +# Time span of the trend chart. +DAYS = 90 + +# Read data from benchmark result file. +def read_data(data_dir, scen, impl, size, conn, stream, round, date): + dirname = "benchmark_%s_%s_%d_%d.%s" % (scen, size, conn, stream, date) + filename = "benchmark_%s_%s_%s_%d_%d.%d.%s" % (scen, impl, size, conn, stream, round, date) + path = os.path.join(data_dir, dirname, filename) + try: + with open(path) as f: + data = f.read().strip() + return float(data) + except: + return 0.0 + +# Load benchmark results into array. +def prepare_data(data_dir): + titles = [' ' for _ in range((len(LONG_FILE_SIZES)*len(LONG_CONNS)*len(LONG_STREAMS) + len(SHORT_FILE_SIZES)*len(SHORT_CONNS)*len(SHORT_STREAMS)))] + result = [[[[0.0 for _ in range(len(LONG_FILE_SIZES)*len(LONG_CONNS)*len(LONG_STREAMS) + len(SHORT_FILE_SIZES)*len(SHORT_CONNS)*len(SHORT_STREAMS))] for _ in range(len(IMPLS))] for _ in range(DAYS)] for _ in range(ROUND) ] + + # Load long connection scenario result. + I = len(LONG_FILE_SIZES) + J = len(LONG_CONNS) + K = len(LONG_STREAMS) + N = len(IMPLS) + D = DAYS + for i in range(I): + for j in range(J): + for k in range(K): + titles[i*J*K+j*K+k] = "long %s %d %d" % (LONG_FILE_SIZES[i], LONG_CONNS[j], LONG_STREAMS[k]) + for n in range(N): + for d in range(D): + for r in range(ROUND): + date = (datetime.datetime.now() - datetime.timedelta(days=d)).strftime('%Y-%m-%d') + result[r][D-1-d][n][i*J*K+j*K+k] = read_data(data_dir, "long", IMPLS[n], LONG_FILE_SIZES[i], LONG_CONNS[j], LONG_STREAMS[k], r, date) + + # Load short connection scenario result. + M = len(LONG_FILE_SIZES)*len(LONG_CONNS)*len(LONG_STREAMS) + I = len(SHORT_FILE_SIZES) + J = len(SHORT_CONNS) + K = len(SHORT_STREAMS) + N = len(IMPLS) + D = DAYS + for i in range(I): + for j in range(J): + for k in range(K): + titles[M+i*J*K+j*K+k] = "short %s %d %d" % (SHORT_FILE_SIZES[i], SHORT_CONNS[j], SHORT_STREAMS[k]) + for n in range(N): + for d in range(D): + for r in range(ROUND): + date = (datetime.datetime.now() - datetime.timedelta(days=d)).strftime('%Y-%m-%d') + result[r][D-1-d][n][M+i*J*K+j*K+k] = read_data(data_dir, "short", IMPLS[n], SHORT_FILE_SIZES[i], SHORT_CONNS[j], LONG_STREAMS[k], r, date) + + # Average by rounds. + result_avg = np.mean(np.array(result), axis=0).tolist() + + # Normalize benchmark result. + for d in range(D): + base = result_avg[d][0] + for i in range(1, len(result_avg[d])): + result_avg[d][i] = [round(x/y, 4) if y != 0 else 0 for x, y in zip(result_avg[d][i], base)] + for i in range(len(result_avg[d][0])): + if result_avg[d][0][i] != 0: + result_avg[d][0][i] = 1 + + return titles, result_avg + +# Print benchmark performance result to stdout. +def show(titles, result): + table = prettytable.PrettyTable() + table.field_names = titles + + for i in range(len(result)): + colored_row_name = termcolor.colored(IMPLS[i], 'green') + table.add_row([colored_row_name] + result[i]) + + print(table) + +# Plot graph according to benchmark performance result. +def plot(titles, result): + + N = len(titles) + M = len(result) + + width = 0.35 + gap = 0.5 + + ind = np.arange(N) * (width * M + gap) + + fig, ax = plt.subplots() + fig.set_size_inches(10, 5) + for i in range(M): + ax.bar(ind + i*width, result[i], width, label=IMPLS[i]) + + ax.set_ylabel('RPS') + ax.set_title('TQUIC benchmark') + ax.set_xticks(ind + width * M / 2) + ax.set_xticklabels(titles, rotation=45, fontsize=6) + + ax.legend() + + plt.savefig("benchmark_all.png", dpi=300) + +# Plot trend of latest days. +def trend(titles, result): + num_scenarios = len(result[0][0]) + num_curves = len(result[0]) + + fig = plt.figure(figsize=(10, num_scenarios*5)) + + for s in range(num_scenarios): + ax = fig.add_subplot(num_scenarios, 1, s+1) + ax.set_title(titles[s]) + ax.set_xlabel("Date") + ax.set_ylabel("RPS") + + for c in range(num_curves): + y_values = [result[d][c][s] for d in range(DAYS)] + ax.plot(list(range(DAYS)), y_values, label=IMPLS[c]) + + ax.legend() + + plt.tight_layout() + plt.savefig("benchmark_all_trend.png", dpi=300) + +if __name__ == '__main__': + if len(sys.argv) < 2: + print("Usage: %s [data_dir]" % (sys.argv[0])) + exit(1) + + data_dir= sys.argv[1] + titles, result = prepare_data(data_dir) + plot(titles, result[DAYS-1]) + trend(titles, result) + titles.insert(0, '') + show(titles, result[DAYS-1]) + diff --git a/.github/workflows/plot-fct.py b/.github/workflows/plot-fct.py index 67b5f504..d21ec5d4 100644 --- a/.github/workflows/plot-fct.py +++ b/.github/workflows/plot-fct.py @@ -7,7 +7,7 @@ import matplotlib.pyplot as plt import numpy as np -# QUIC implementes +# QUIC implementations IMPLS = ["tquic", "gquiche", "lsquic", "picoquic", "quiche"] # Different modes diff --git a/.github/workflows/plot-goodput.py b/.github/workflows/plot-goodput.py index b55dec6f..21b802ae 100644 --- a/.github/workflows/plot-goodput.py +++ b/.github/workflows/plot-goodput.py @@ -7,7 +7,7 @@ import matplotlib.pyplot as plt import numpy as np -# QUIC implementes +# QUIC implementations IMPLS = ["tquic", "gquiche", "lsquic", "picoquic", "quiche"] # Different file sizes diff --git a/.github/workflows/plot-interop.py b/.github/workflows/plot-interop.py index 1995ef29..b52fd9d2 100644 --- a/.github/workflows/plot-interop.py +++ b/.github/workflows/plot-interop.py @@ -8,7 +8,7 @@ import numpy as np from matplotlib.colors import ListedColormap -# QUIC implementes +# QUIC implementations CLIENT_IMPLS = ["tquic", "lsquic", "quiche", "picoquic", "ngtcp2", "msquic", "s2n-quic", "quinn", "neqo", "kwik", "aioquic", "chrome", "go-x-net", "quic-go", "mvfst"] diff --git a/.github/workflows/tquic-benchmark.yml b/.github/workflows/tquic-benchmark.yml new file mode 100644 index 00000000..186195ab --- /dev/null +++ b/.github/workflows/tquic-benchmark.yml @@ -0,0 +1,260 @@ +name: Benchmark + +on: + schedule: + - cron: '30 3 * * *' + workflow_dispatch: + +env: + CARGO_TERM_COLOR: always + +jobs: + config: + name: Prepare config + runs-on: ubuntu-latest + outputs: + benchmark_date: ${{ steps.set-benchmark-date.outputs.benchmark_date }} + benchmark_impls: ${{ steps.set-implements.outputs.benchmark_impls }} + benchmark_rounds: ${{ steps.set-rounds.outputs.benchmark_rounds }} + benchmark_duration: ${{ steps.set-duration.outputs.benchmark_duration }} + steps: + - name: Set date + id: set-benchmark-date + run: | + BENCHMARK_DATE=$(date -u +"%Y-%m-%d") + echo $BENCHMARK_DATE + echo "benchmark_date=$BENCHMARK_DATE" >> $GITHUB_OUTPUT + - name: Set implements + id: set-implements + run: | + IMPLS="lsquic tquic" + echo "benchmark_impls=$IMPLS" >> $GITHUB_OUTPUT + - name: Set rounds + id: set-rounds + run: | + ROUNDS=5 + echo "benchmark_rounds=$ROUNDS" >> $GITHUB_OUTPUT + - name: Set benchmark duration + id: set-duration + run: | + DURATION=120 + echo "benchmark_duration=$DURATION" >> $GITHUB_OUTPUT + + build_tquic: + name: Build tquic + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: 'recursive' + - name: Update rust + run: rustup update + - name: Build tquic + run: | + cargo build --all --release + cp target/release/tquic_server tquic_server + cp target/release/tquic_client tquic_client + - name: Build start script + run: | + echo $'#!/bin/bash\ncd "$(dirname "$0")"\nchmod u+x ./tquic_server\n./tquic_server --send-udp-payload-size 1350 --log-level OFF --root ../files --disable-stateless-reset -l 0.0.0.0:4433 -c ../cert/cert.crt -k ../cert/cert.key &' > start_tquic.sh + chmod u+x start_tquic.sh + - name: Upload tquic_server + uses: actions/upload-artifact@v4 + with: + name: tquic_server_bin + path: | + tquic_server + start_tquic.sh + - name: Upload tquic_client + uses: actions/upload-artifact@v4 + with: + name: tquic_client_bin + path: tquic_client + + build_lsquic: + name: Build lsquic + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + repository: 'litespeedtech/lsquic' + path: lsquic + submodules: 'recursive' + - name: Build lsquic + run: | + git clone https://boringssl.googlesource.com/boringssl + cd boringssl + git checkout 9fc1c33e9c21439ce5f87855a6591a9324e569fd + cmake . && make + BORINGSSL=$PWD + cd ../lsquic + sudo apt install libevent-dev + cmake -DBORINGSSL_DIR=$BORINGSSL . + make + cp bin/http_server ../lsquic_server + - name: Build start script + run: | + echo $'#!/bin/bash\ncd "$(dirname "$0")"\nchmod u+x ./lsquic_server\n./lsquic_server -c tquic_benchmark,../cert/cert.crt,../cert/cert.key -s 0.0.0.0:4433 -r ../files -L crit > lsquic.log 2>&1 &' > start_lsquic.sh + chmod u+x start_lsquic.sh + - name: Upload lsquic server + uses: actions/upload-artifact@v4 + with: + name: lsquic_server_bin + path: | + lsquic_server + start_lsquic.sh + + gen_cert: + name: Generate cert + runs-on: ubuntu-latest + steps: + - name: Generate cert + run: | + openssl genrsa -out cert.key 2048 + openssl req -new -x509 -key cert.key -out cert.crt -days 365 -subj "/CN=tquic_benchmark" + - name: Upload cert + uses: actions/upload-artifact@v4 + with: + name: cert + path: cert.* + + gen_files: + name: Generate files + runs-on: ubuntu-latest + steps: + - name: Generate files + run: | + head -c 1K /dev/urandom > file_1K + head -c 15K /dev/urandom > file_15K + head -c 50K /dev/urandom > file_50K + head -c 2M /dev/urandom > file_2M + - name: Upload files + uses: actions/upload-artifact@v4 + with: + name: files + path: file_* + + run_long_conn: + name: Run long connection scenario benchmark + needs: [ config, build_tquic, build_lsquic, gen_cert, gen_files ] + runs-on: ubuntu-latest + strategy: + matrix: + file: [ 15K, 50K, 2M ] + conn: [ 10 ] + stream: [ 1, 10 ] + steps: + - name: Download all + uses: actions/download-artifact@v4 + - name: Display structure of downloaded files + run: ls -R + - name: Benchmark + run: | + chmod u+x ./tquic_client_bin/tquic_client + for((round=0;round<${{ needs.config.outputs.benchmark_rounds }};round++));do + for impl in ${{ needs.config.outputs.benchmark_impls }};do + sh ${impl}_server_bin/start_${impl}.sh + pgrep ${impl}_server + sleep 1 + ./tquic_client_bin/tquic_client https://tquic_benchmark:4433/file_${{ matrix.file }} --connect-to 127.0.0.1:4433 --threads ${{ matrix.conn }} --max-concurrent-conns 1 --max-concurrent-requests ${{ matrix.stream }} --max-requests-per-conn 0 --total-requests-per-thread 0 -d ${{ needs.config.outputs.benchmark_duration }} --disable-stateless-reset --send-batch-size 1 --recv-udp-payload-size 1350 --send-udp-payload-size 1350 --log-level OFF > client.log 2>&1 + cat client.log | grep "finished in" | awk '{print $4}' > benchmark_long_${impl}_${{ matrix.file }}_${{ matrix.conn }}_${{ matrix.stream }}.${round}.${{ needs.config.outputs.benchmark_date }} + killall ${impl}_server + sleep 1 + done + done + - name: Upload benchmark result + uses: actions/upload-artifact@v4 + with: + name: benchmark_long_${{ matrix.file }}_${{ matrix.conn }}_${{ matrix.stream }}.${{ needs.config.outputs.benchmark_date }} + path: benchmark_long_* + retention-days: 90 + + run_short_conn: + name: Run short connection scenario benchmark + needs: [ config, build_tquic, build_lsquic, gen_cert, gen_files ] + runs-on: ubuntu-latest + steps: + - name: Download all + uses: actions/download-artifact@v4 + - name: Display structure of downloaded files + run: ls -R + - name: Benchmark + run: | + chmod u+x ./tquic_client_bin/tquic_client + for((round=0;round<${{ needs.config.outputs.benchmark_rounds }};round++));do + for impl in ${{ needs.config.outputs.benchmark_impls }};do + sh ${impl}_server_bin/start_${impl}.sh + pgrep ${impl}_server + sleep 1 + ./tquic_client_bin/tquic_client https://tquic_benchmark:4433/file_1K --connect-to 127.0.0.1:4433 --threads 10 --max-concurrent-conns 1 --max-concurrent-requests 1 --max-requests-per-conn 1 --total-requests-per-thread 0 -d ${{ needs.config.outputs.benchmark_duration }} --disable-stateless-reset --send-batch-size 1 --recv-udp-payload-size 1350 --send-udp-payload-size 1350 --log-level OFF > client.log 2>&1 + cat client.log | grep "finished in" | awk '{print $4}' > benchmark_short_${impl}_1K_10_1.${round}.${{ needs.config.outputs.benchmark_date }} + killall ${impl}_server + sleep 1 + done + done + - name: Upload benchmark result + uses: actions/upload-artifact@v4 + with: + name: benchmark_short_1K_10_1.${{ needs.config.outputs.benchmark_date }} + path: benchmark_short_* + retention-days: 90 + + result: + runs-on: ubuntu-latest + needs: [ run_long_conn, run_short_conn ] + steps: + - name: Download plot tools + uses: actions/checkout@v4 + - name: Download all + uses: actions/download-artifact@v4 + with: + path: benchmark_result + - name: Download latest benchmark history + working-directory: ./benchmark_result + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + SCENARIO=("long" "short") + LONG_FILE_SIZES=("15K" "50K" "2M") + SHORT_FILE_SIZES=("1K") + LONG_CONNS=10 + SHORT_CONNS=10 + LONG_STREAMS=(1 10) + SHORT_STREAMS=(1) + DAYS=90 + for ((i=1; i<$DAYS; i++)); do + date=$(date -d "-$i day" +%Y-%m-%d) + download_cmd="gh run download" + for scen in "${SCENARIO[@]}"; do + if [ "$scen" == "long" ]; then + FILE_SIZES=("${LONG_FILE_SIZES[@]}") + CONNS=$LONG_CONNS + STREAMS=("${LONG_STREAMS[@]}") + else + FILE_SIZES=("${SHORT_FILE_SIZES[@]}") + CONNS=$SHORT_CONNS + STREAMS=("${SHORT_STREAMS[@]}") + fi + for size in "${FILE_SIZES[@]}"; do + for stream in "${STREAMS[@]}"; do + download_cmd+=" -n benchmark_${scen}_${size}_${CONNS}_${stream}.${date}" + done + done + done + echo "$download_cmd" + eval "$download_cmd" || echo "" + done + - name: Display structure of downloaded files + run: ls -R benchmark_result + - name: Install dependencies + run: | + sudo apt install python3-matplotlib + pip3 install prettytable termcolor + - name: Plot and print all benchmark results + run: python3 .github/workflows/plot-benchmark.py ./benchmark_result + - name: Store all benchmark results + uses: actions/upload-artifact@v4 + with: + name: benchmark_all + path: benchmark_all* + diff --git a/.github/workflows/tquic-fct.yml b/.github/workflows/tquic-fct.yml index 1a9ff85c..acc26f0d 100644 --- a/.github/workflows/tquic-fct.yml +++ b/.github/workflows/tquic-fct.yml @@ -37,7 +37,7 @@ jobs: cd quic-interop-runner pip3 install -r requirements.txt - - name: Install dependences + - name: Install dependencies run: | sudo modprobe ip6table_filter sudo add-apt-repository -y ppa:wireshark-dev/stable @@ -76,7 +76,7 @@ jobs: with: path: tools - - name: Install dependences + - name: Install dependencies run: | sudo apt install python3-matplotlib diff --git a/.github/workflows/tquic-goodput.yml b/.github/workflows/tquic-goodput.yml index 20ef8a9b..7402fe99 100644 --- a/.github/workflows/tquic-goodput.yml +++ b/.github/workflows/tquic-goodput.yml @@ -36,7 +36,7 @@ jobs: cd quic-interop-runner pip3 install -r requirements.txt - - name: Install dependences + - name: Install dependencies run: | sudo modprobe ip6table_filter sudo add-apt-repository -y ppa:wireshark-dev/stable @@ -74,7 +74,7 @@ jobs: with: path: tools - - name: Install dependences + - name: Install dependencies run: | sudo apt install python3-matplotlib diff --git a/.github/workflows/tquic-interop-all.yml b/.github/workflows/tquic-interop-all.yml index 36aa9ab3..b818e932 100644 --- a/.github/workflows/tquic-interop-all.yml +++ b/.github/workflows/tquic-interop-all.yml @@ -95,7 +95,7 @@ jobs: cd quic-interop-runner pip3 install -r requirements.txt - - name: Install dependences + - name: Install dependencies run: | sudo modprobe ip6table_filter sudo add-apt-repository -y ppa:wireshark-dev/stable @@ -141,7 +141,7 @@ jobs: with: path: tools - - name: Install dependences + - name: Install dependencies run: | sudo apt install python3-matplotlib diff --git a/.github/workflows/tquic-interop-main.yaml b/.github/workflows/tquic-interop-main.yaml index 06beec80..ba93ca6f 100644 --- a/.github/workflows/tquic-interop-main.yaml +++ b/.github/workflows/tquic-interop-main.yaml @@ -42,7 +42,7 @@ jobs: cd quic-interop-runner pip3 install -r requirements.txt - - name: Install dependences + - name: Install dependencies run: | sudo modprobe ip6table_filter sudo add-apt-repository -y ppa:wireshark-dev/stable diff --git a/CHANGELOG.md b/CHANGELOG.md index 2fef3693..03cda699 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [v0.12.0] - 2024-05-27 +# Added +- Buffer disordered zero rtt packets on the server endpoint +- Add dummy congestion controller for testing and expriments +- Tweak configurations and initialization of flow control +- Improve comments of bbr congestion control algorithm +- Add workflow and plot tools for benchmarking +- tquic_tools: tquic_tools: add the `version` option + +# Fixed +- Fix dropping datagrams from unknown connections on the client endpoint +- Fix handling restart from idle for bbr/bbr3 algorithms +- tquic_tools: resolve minor issues + + ## [v0.11.0] - 2024-05-08 ### Added @@ -217,6 +232,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Provide example clients and servers. +[v0.12.0]: https://github.com/tencent/tquic/compare/v0.11.0...v0.12.0 [v0.11.0]: https://github.com/tencent/tquic/compare/v0.10.0...v0.11.0 [v0.10.0]: https://github.com/tencent/tquic/compare/v0.9.0...v0.10.0 [v0.9.0]: https://github.com/tencent/tquic/compare/v0.8.1...v0.9.0 diff --git a/Cargo.toml b/Cargo.toml index b918650e..51a03349 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tquic" -version = "0.11.0" +version = "0.12.0" edition = "2021" rust-version = "1.70.0" license = "Apache-2.0" @@ -42,6 +42,7 @@ strum = "0.24" strum_macros = "0.24" rand = "0.8.5" smallvec = { version = "1.10", features = ["serde", "union"] } +hashlru = "0.11" serde = { version = "1.0.139", features = ["derive"] } serde_json = { version = "1.0", features = ["preserve_order"] } serde_derive = "1.0" diff --git a/include/tquic.h b/include/tquic.h index efd56a97..574b160c 100644 --- a/include/tquic.h +++ b/include/tquic.h @@ -336,21 +336,29 @@ void quic_config_set_send_udp_payload_size(struct quic_config_t *config, uintptr /** * Set the `initial_max_data` transport parameter. It means the initial * value for the maximum amount of data that can be sent on the connection. + * The value is capped by the setting `max_connection_window`. + * The default value is `10485760`. */ void quic_config_set_initial_max_data(struct quic_config_t *config, uint64_t v); /** * Set the `initial_max_stream_data_bidi_local` transport parameter. + * The value is capped by the setting `max_stream_window`. + * The default value is `5242880`. */ void quic_config_set_initial_max_stream_data_bidi_local(struct quic_config_t *config, uint64_t v); /** * Set the `initial_max_stream_data_bidi_remote` transport parameter. + * The value is capped by the setting `max_stream_window`. + * The default value is `2097152`. */ void quic_config_set_initial_max_stream_data_bidi_remote(struct quic_config_t *config, uint64_t v); /** * Set the `initial_max_stream_data_uni` transport parameter. + * The value is capped by the setting `max_stream_window`. + * The default value is `1048576`. */ void quic_config_set_initial_max_stream_data_uni(struct quic_config_t *config, uint64_t v); @@ -436,11 +444,14 @@ void quic_config_set_multipath_algorithm(struct quic_config_t *config, /** * Set the maximum size of the connection flow control window. + * The default value is MAX_CONNECTION_WINDOW (15 MB). */ void quic_config_set_max_connection_window(struct quic_config_t *config, uint64_t v); /** * Set the maximum size of the stream flow control window. + * The value should not be greater than the setting `max_connection_window`. + * The default value is MAX_STREAM_WINDOW (6 MB). */ void quic_config_set_max_stream_window(struct quic_config_t *config, uint64_t v); @@ -497,6 +508,12 @@ void quic_config_set_cid_len(struct quic_config_t *config, uint8_t v); */ void quic_config_set_send_batch_size(struct quic_config_t *config, uint16_t v); +/** + * Set the buffer size for disordered zerortt packets on the server. + * Applicable to Server only. + */ +void quic_config_set_zerortt_buffer_size(struct quic_config_t *config, uint16_t v); + /** * Create a new TlsConfig. * The caller is responsible for the memory of the TlsConfig and should properly diff --git a/interop/run_endpoint.sh b/interop/run_endpoint.sh index 6eccce2b..030c6ec6 100644 --- a/interop/run_endpoint.sh +++ b/interop/run_endpoint.sh @@ -55,11 +55,16 @@ BBR3) COPA) CC_ALGOR="COPA" ;; +DUMMY) + CC_ALGOR="DUMMY" + ;; *) ;; esac -COMMON_ARGS="--keylog-file $SSLKEYLOGFILE --log-level DEBUG --log-file $LOG_DIR/$ROLE.log --idle-timeout 30000 --handshake-timeout 30000 --initial-rtt 100 --congestion-control-algor $CC_ALGOR" +# Note: You can add extra command-line options to tquic_client/tquic_sever by +# using the `EXTRA_ARGS` environment variable. +COMMON_ARGS="--keylog-file $SSLKEYLOGFILE --log-level DEBUG --log-file $LOG_DIR/$ROLE.log --idle-timeout 30000 --handshake-timeout 30000 --initial-rtt 100 --congestion-control-algor $CC_ALGOR $EXTRA_ARGS" if [ "$TESTCASE" != "transfer" ]; then COMMON_ARGS="$COMMON_ARGS --qlog-dir $QLOG_DIR" diff --git a/src/congestion_control/bbr.rs b/src/congestion_control/bbr.rs old mode 100755 new mode 100644 index 6f8f5e81..ba84caf6 --- a/src/congestion_control/bbr.rs +++ b/src/congestion_control/bbr.rs @@ -121,7 +121,7 @@ const SEND_QUANTUM_THRESHOLD_PACING_RATE: u64 = 1_200_000 / 8; /// BBR State Machine. /// -/// See . +/// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 3.4. #[derive(Debug, PartialEq, Eq)] enum BbrStateMachine { Startup, @@ -198,7 +198,7 @@ impl Default for AckState { /// BBR Congestion Control Algorithm. /// -/// See . +/// See draft-cardwell-iccrg-bbr-congestion-control-00. #[derive(Debug)] pub struct Bbr { /// Configurable parameters. @@ -265,7 +265,7 @@ pub struct Bbr { /// Whether a roundtrip in ProbeRTT state ends. probe_rtt_round_done: bool, - /// Whether in packet sonservation mode. + /// Whether in packet conservation mode. packet_conservation: bool, /// Cwnd before loss recovery. @@ -333,7 +333,8 @@ impl Bbr { } /// Initialization Steps. - /// See . + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.3.1. fn init(&mut self) { self.rtprop = self.config.initial_rtt.unwrap_or(Duration::MAX); self.rtprop_stamp = Instant::now(); @@ -350,21 +351,21 @@ impl Bbr { self.enter_startup(); } - /// See . + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.1.1.3. fn init_round_counting(&mut self) { self.round.next_round_delivered = 0; self.round.round_count = 0; self.round.is_round_start = false; } - /// See . + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.3.2.2. fn init_full_pipe(&mut self) { self.full_pipe.is_filled_pipe = false; self.full_pipe.full_bw = 0; self.full_pipe.full_bw_count = 0; } - // See https://datatracker.ietf.org/doc/html/draft-cardwell-iccrg-bbr-congestion-control-00#section-4.2.1 + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.1 fn init_pacing_rate(&mut self) { // When a BBR flow starts it has no BBR.BtlBw estimate. So in this case // it sets an initial pacing rate based on the transport sender implementation's @@ -378,14 +379,24 @@ impl Bbr { self.pacing_rate = (self.pacing_gain * nominal_bandwidth) as u64; } - /// See . + /// Enter the Startup state + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.3.2.1. fn enter_startup(&mut self) { self.state = BbrStateMachine::Startup; + + // To achieve this rapid probing in the smoothest possible fashion, upon + // entry into Startup state BBR sets BBR.pacing_gain and BBR.cwnd_gain + // to BBRHighGain, the minimum gain value that will allow the sending + // rate to double each round. self.pacing_gain = HIGH_GAIN; self.cwnd_gain = HIGH_GAIN; } - /// See . + /// Estimate whether the pipe is full by looking for a plateau in the + /// BBR.BtlBw estimate. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.3.2.2. fn check_full_pipe(&mut self) { // no need to check for a full pipe now if self.is_filled_pipe() @@ -400,26 +411,36 @@ impl Bbr { // record new baseline level self.full_pipe.full_bw = self.btlbw; self.full_pipe.full_bw_count = 0; - return; } // another round w/o much growth self.full_pipe.full_bw_count += 1; + // BBR waits three rounds in order to have solid evidence that the + // sender is not detecting a delivery-rate plateau that was temporarily + // imposed by the receive window. + // This three-round threshold was validated by YouTube experimental data. if self.full_pipe.full_bw_count >= FULL_BW_COUNT_THRESHOLD { self.full_pipe.is_filled_pipe = true; } } - /// See . + /// Update the virtual time tracked by BBR.round_count. + /// + /// BBR tracks time for the BBR.BtlBw filter window using a virtual time + /// tracked by BBR.round_countt, a count of "packet-timed" round-trips. + /// The BBR.round_count counts packet-timed round trips by recording state + /// about a sentinel packet, and waiting for an ACK of any data packet that + /// was sent after that sentinel packet. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.1.1.3. fn update_round(&mut self) { if self.ack_state.packet_delivered >= self.round.next_round_delivered { self.round.next_round_delivered = self.delivery_rate_estimator.delivered(); self.round.round_count += 1; self.round.is_round_start = true; - // After one round-trip in Fast Recovery: - // BBR.packet_conservation = false + // After one round-trip in Fast Recovery, exit the packet conservation mode. self.packet_conservation = false; } else { self.round.is_round_start = false; @@ -436,30 +457,37 @@ impl Bbr { self.round.is_round_start } - /// See . + /// Try to update the pacing rate using the given pacing_gain + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.1. fn set_pacing_rate_with_gain(&mut self, pacing_gain: f64) { let rate = (pacing_gain * self.btlbw as f64) as u64; - // on each data ACK BBR updates its pacing rate to - // be proportional to BBR.BtlBw, as long as it estimates that it has - // filled the pipe (BBR.filled_pipe is true; see the "Startup" section - // below for details), or doing so increases the pacing rate. + // On each data ACK BBR updates its pacing rate to be proportional to + // BBR.BtlBw, as long as it estimates that it has filled the pipe, or + // doing so increases the pacing rate. if self.is_filled_pipe() || rate > self.pacing_rate { self.pacing_rate = rate; } } - /// See . + /// In Drain, BBR aims to quickly drain any queue created in Startup by + /// switching to a pacing_gain well below 1.0. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.3.3. fn enter_drain(&mut self) { self.state = BbrStateMachine::Drain; - // pace slowly - self.pacing_gain = 1.0 / HIGH_GAIN; - // maintain cwnd - self.cwnd_gain = HIGH_GAIN; + + // It uses a pacing_gain that is the inverse of the value used during + // Startup, which drains the queue in one round. + self.pacing_gain = 1.0 / HIGH_GAIN; // pace slowly + self.cwnd_gain = HIGH_GAIN; // maintain cwnd } - /// See . - /// Target cwnd. + /// Calculate the target cwnd, which is the upper bound on the volume of data BBR + /// allows in flight. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.3.2 Target cwnd. fn inflight(&self, gain: f64) -> u64 { if self.rtprop == Duration::MAX { // no valid RTT samples yet @@ -470,29 +498,52 @@ impl Bbr { // and receiving hosts to reach full utilization even in high-throughput // environments using offloading mechanisms. let quanta = 3 * self.send_quantum; + + // The "estimated_bdp" term allows enough packets in flight to fully + // utilize the estimated BDP of the path, by allowing the flow to send + // at BBR.BtlBw for a duration of BBR.RTprop. let estimated_bdp = self.btlbw as f64 * self.rtprop.as_secs_f64(); + // Scaling up the BDP by cwnd_gain, selected by the BBR state machine to + // be above 1.0 at all times, bounds in-flight data to a small multiple + // of the BDP, in order to handle common network and receiver pathologies, + // such as delayed, stretched, or aggregated ACKs. (gain * estimated_bdp) as u64 + quanta } - /// See . + /// On each ACK, BBR calculates the BBR.target_cwnd. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.3.2. fn update_target_cwnd(&mut self) { self.target_cwnd = self.inflight(self.cwnd_gain); } - /// See . + /// Check and try to enter or leave Drain state. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.3.3. fn check_drain(&mut self, bytes_in_flight: u64, now: Instant) { + // In Startup, when the BBR "full pipe" estimator estimates that BBR has + // filled the pipe, BBR switches to its Drain state. if self.state == BbrStateMachine::Startup && self.is_filled_pipe() { self.enter_drain(); } + // In Drain, when the number of packets in flight matches the estimated + // BDP, meaning BBR estimates that the queue has been fully drained but + // the pipe is still full, then BBR leaves Drain and enters ProbeBW. if self.state == BbrStateMachine::Drain && bytes_in_flight <= self.inflight(1.0) { // we estimate queue is drained self.enter_probe_bw(now); } } - /// See . + /// Enter the ProbeBW state. + /// BBR flows spend the vast majority of their time in ProbeBW state, + /// probing for bandwidth using an approach called gain cycling, which + /// helps BBR flows reach high throughput, low queuing delay, and + /// convergence to a fair share of bandwidth. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.3.4.3. fn enter_probe_bw(&mut self, now: Instant) { self.state = BbrStateMachine::ProbeBW; self.pacing_gain = 1.0; @@ -504,7 +555,6 @@ impl Bbr { // gain cycling by randomly picking an initial phase, from among all but // the 3/4 phase, when entering ProbeBW. self.cycle_index = GAIN_CYCLE_LEN - 1 - rand::thread_rng().gen_range(0..GAIN_CYCLE_LEN - 1); - self.advance_cycle_phase(now); } @@ -515,7 +565,7 @@ impl Bbr { } } - /// Advance cycle phase during probe_bw state. + /// Advance cycle phase during ProbeBW state. fn advance_cycle_phase(&mut self, now: Instant) { // BBR flows spend the vast majority of their time in ProbeBW state, // probing for bandwidth using an approach called gain cycling, which @@ -526,6 +576,7 @@ impl Bbr { self.pacing_gain = PACING_GAIN_CYCLE[self.cycle_index]; } + /// Check if it's time to advance to the next gain cycle phase in ProbeBW state. fn is_next_cycle_phase(&mut self, now: Instant) -> bool { // Each cycle phase normally lasts for roughly BBR.RTprop. let is_full_length = now.saturating_duration_since(self.cycle_stamp) > self.rtprop; @@ -550,11 +601,16 @@ impl Bbr { is_full_length } - /// See . + /// When restarting from idle, BBR leaves its cwnd as-is and paces + /// packets at exactly BBR.BtlBw, aiming to return as quickly as possible + /// to its target operating point of rate balance and a full pipe. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.3.4.4. fn handle_restart_from_idle(&mut self, bytes_in_flight: u64) { - // When restarting from idle, BBR leaves its cwnd as-is and paces - // packets at exactly BBR.BtlBw, aiming to return as quickly as possible - // to its target operating point of rate balance and a full pipe. + // If the flow's BBR.state is ProbeBW, and the flow is + // application-limited, and there are no packets in flight currently, + // then at the moment the flow sends one or more packets BBR sets + // BBR.pacing_rate to exactly BBR.BtlBw. if bytes_in_flight == 0 && self.delivery_rate_estimator.is_app_limited() { self.is_idle_restart = true; @@ -564,8 +620,12 @@ impl Bbr { } } - /// See . /// Remember cwnd. + /// + /// It helps remember and restore the last-known good cwnd (the latest cwnd + /// unmodulated by loss recovery or ProbeRTT) + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.3.4. fn save_cwnd(&mut self) { self.prior_cwnd = if !self.in_recovery && self.state != BbrStateMachine::ProbeRTT { self.cwnd @@ -574,17 +634,26 @@ impl Bbr { } } - /// Restore cwnd + /// Restore cwnd. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.3.4. fn restore_cwnd(&mut self) { self.cwnd = self.cwnd.max(self.prior_cwnd) } + /// Return cwnd for ProbeRTT state. fn probe_rtt_cwnd(&self) -> u64 { self.config.min_cwnd } - /// See . + /// Check and try to enter or leave ProbeRTT state. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.3.5. fn check_probe_rtt(&mut self, now: Instant, bytes_in_flight: u64) { + // In any state other than ProbeRTT itself, if the RTProp estimate has + // not been updated (i.e., by getting a lower RTT measurement) for more + // than ProbeRTTInterval = 10 seconds, then BBR enters ProbeRTT and + // reduces the cwnd to a minimal value, BBRMinPipeCwnd (four packets). if self.state != BbrStateMachine::ProbeRTT && self.is_rtprop_expired && !self.is_idle_restart @@ -603,6 +672,7 @@ impl Bbr { self.is_idle_restart = false; } + /// Enter the ProbeRTT state fn enter_probe_rtt(&mut self) { self.state = BbrStateMachine::ProbeRTT; @@ -610,6 +680,7 @@ impl Bbr { self.cwnd_gain = 1.0; } + /// Process for the ProbeRTT state fn handle_probe_rtt(&mut self, now: Instant, bytes_in_flight: u64) { // Ignore low rate samples during ProbeRTT. MarkConnectionAppLimited. // C.app_limited = (BW.delivered + packets_in_flight) ? : 1 @@ -620,9 +691,11 @@ impl Bbr { self.probe_rtt_round_done = true; } + // After maintaining BBRMinPipeCwnd or fewer packets in flight for + // at least ProbeRTTDuration (200 ms) and one round trip, BBR leaves + // ProbeRTT. if self.probe_rtt_round_done && now >= probe_rtt_done_stamp { self.rtprop_stamp = now; - self.restore_cwnd(); self.exit_probe_rtt(now); } @@ -634,6 +707,8 @@ impl Bbr { } } + /// BBR leaves ProbeRTT and transitions to either Startup or ProbeBW, + /// depending on whether it estimates the pipe was filled already. fn exit_probe_rtt(&mut self, now: Instant) { if self.is_filled_pipe() { self.enter_probe_bw(now); @@ -642,6 +717,7 @@ impl Bbr { } } + /// On every ACK, the BBR updates its network path model and state machine fn update_model_and_state(&mut self, now: Instant) { self.update_btlbw(); self.check_cycle_phase(now); @@ -651,13 +727,17 @@ impl Bbr { self.check_probe_rtt(now, self.stats.bytes_in_flight); } + /// BBR adjusts its control parameters to adapt to the updated model. fn update_control_parameters(&mut self) { self.set_pacing_rate(); self.set_send_quantum(); self.set_cwnd(); } - /// See . + /// For every ACK that acknowledges some data packets as delivered, BBR + /// update the BBR.BtlBw estimator as follows. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.1.1.5. fn update_btlbw(&mut self) { self.update_round(); @@ -672,7 +752,10 @@ impl Bbr { } } - /// See . + /// On every ACK that provides an RTT sample BBR updates the BBR.RTprop + /// estimator as follows. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.1.2.3. fn update_rtprop(&mut self, now: Instant) { let sample_rtt = self.delivery_rate_estimator.sample_rtt(); @@ -687,21 +770,25 @@ impl Bbr { } } - /// See . + /// BBR updates the pacing rate on each ACK as follows. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.1. fn set_pacing_rate(&mut self) { self.set_pacing_rate_with_gain(self.pacing_gain); } - /// See . + /// On each ACK, BBR runs BBRSetSendQuantum() to update BBR.send_quantum + /// as follows. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.2. fn set_send_quantum(&mut self) { // A BBR implementation MAY use alternate approaches to select a // BBR.send_quantum, as appropriate for the CPU overheads anticipated // for senders and receivers, and buffering considerations anticipated - // in the network path. However, for the sake of the network and other + // in the network path. However, for the sake of the network and other // users, a BBR implementation SHOULD attempt to use the smallest // feasible quanta. - // Adjust according to: - // . + // Adjust according to draft-cardwell-iccrg-bbr-congestion-control-02 let floor = if self.pacing_rate < SEND_QUANTUM_THRESHOLD_PACING_RATE { self.config.max_datagram_size } else { @@ -713,7 +800,12 @@ impl Bbr { self.send_quantum = (self.pacing_rate / 1000).clamp(floor, 64 * 1024); } - /// See . + /// Upon every ACK in Fast Recovery, run the following steps, which help + /// ensure packet conservation on the first round of recovery, and sending + /// at no more than twice the current delivery rate on later rounds of + /// recovery. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.3.4. fn modulate_cwnd_for_recovery(&mut self, bytes_in_flight: u64) { if self.ack_state.newly_lost_bytes > 0 { self.cwnd = self @@ -729,7 +821,11 @@ impl Bbr { } } - /// See . + /// To quickly reduce the volume of in-flight data and drain the bottleneck + /// queue, thereby allowing measurement of BBR.RTprop, BBR bounds the cwnd + /// to BBRMinPipeCwnd, the minimal value that allows pipelining. + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.3.5. fn modulate_cwnd_for_probe_rtt(&mut self) { // BBR bounds the cwnd in ProbeRTT. if self.state == BbrStateMachine::ProbeRTT { @@ -737,7 +833,9 @@ impl Bbr { } } - // See https://datatracker.ietf.org/doc/html/draft-cardwell-iccrg-bbr-congestion-control-00#section-4.2.3.6 + /// Adjust the congestion window + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.3.6 fn set_cwnd(&mut self) { let bytes_in_flight = self.stats.bytes_in_flight; @@ -745,6 +843,10 @@ impl Bbr { self.modulate_cwnd_for_recovery(bytes_in_flight); if !self.packet_conservation { + // If BBR has measured enough samples to achieve confidence that it + // has filled the pipe, then it increases its cwnd based on the + // number of packets delivered, while bounding its cwnd to be no + // larger than the BBR.target_cwnd adapted to the estimated BDP. if self.is_filled_pipe() { self.cwnd = self .target_cwnd @@ -752,41 +854,57 @@ impl Bbr { } else if self.cwnd < self.target_cwnd || self.delivery_rate_estimator.delivered() < self.config.initial_cwnd { + // Otherwise, if the cwnd is below the target, or the sender has + // marked so little data delivered (less than InitialCwnd) that + // it does not yet judge its BBR.BtlBw estimate and BBR.target_cwnd + // as useful, then it increases cwnd without bounding it to be + // below the target. self.cwnd += self.ack_state.newly_acked_bytes; } + + // Finally, BBR imposes a floor of BBRMinPipeCwnd in order to allow + // pipelining even with small BDPs. self.cwnd = self.cwnd.max(self.config.min_cwnd); } self.modulate_cwnd_for_probe_rtt(); } - /// See . + /// Enter loss recovery + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.3.4. fn enter_recovery(&mut self, now: Instant) { self.save_cwnd(); self.recovery_epoch_start = Some(now); + + // Upon entering Fast Recovery, set cwnd to the number of packets still + // in flight (allowing at least one for a fast retransmit): self.cwnd = self.stats.bytes_in_flight + self .ack_state .newly_acked_bytes .max(self.config.max_datagram_size); + + // Note: After one round-trip in Fast Recovery, BBR.packet_conservation + // will reset to false self.packet_conservation = true; self.in_recovery = true; - // After one round-trip in Fast Recovery: - // BBR.packet_conservation = false self.round.next_round_delivered = self.delivery_rate_estimator.delivered(); } + /// Exit loss recovery + /// + /// See draft-cardwell-iccrg-bbr-congestion-control-00 Section 4.2.3.4. fn exit_recovery(&mut self) { - // Upon exiting loss recovery (RTO recovery or Fast Recovery), either by - // repairing all losses or undoing recovery, BBR restores the best-known - // cwnd value we had upon entering loss recovery self.recovery_epoch_start = None; - self.packet_conservation = false; self.in_recovery = false; + // Upon exiting loss recovery (RTO recovery or Fast Recovery), either by + // repairing all losses or undoing recovery, BBR restores the best-known + // cwnd value we had upon entering loss recovery self.restore_cwnd(); } } @@ -803,8 +921,8 @@ impl CongestionController for Bbr { self.stats.bytes_lost_in_total, ); - self.stats.bytes_in_flight += packet.sent_size as u64; self.handle_restart_from_idle(self.stats.bytes_in_flight); + self.stats.bytes_in_flight += packet.sent_size as u64; } fn begin_ack(&mut self, now: Instant, bytes_in_flight: u64) { diff --git a/src/congestion_control/bbr3.rs b/src/congestion_control/bbr3.rs index 150d1e02..f2a40994 100644 --- a/src/congestion_control/bbr3.rs +++ b/src/congestion_control/bbr3.rs @@ -1850,8 +1850,8 @@ impl CongestionController for Bbr3 { self.stats.bytes_lost_in_total, ); - self.stats.bytes_in_flight += packet.sent_size as u64; self.handle_restart_from_idle(now, self.stats.bytes_in_flight); + self.stats.bytes_in_flight += packet.sent_size as u64; } fn begin_ack(&mut self, now: Instant, bytes_in_flight: u64) { diff --git a/src/congestion_control/congestion_control.rs b/src/congestion_control/congestion_control.rs index e2257b24..a112e216 100644 --- a/src/congestion_control/congestion_control.rs +++ b/src/congestion_control/congestion_control.rs @@ -32,6 +32,7 @@ pub use copa::Copa; pub use copa::CopaConfig; pub use cubic::Cubic; pub use cubic::CubicConfig; +pub use dummy::Dummy; pub use hystart_plus_plus::HystartPlusPlus; /// Available congestion control algorithms. @@ -61,6 +62,10 @@ pub enum CongestionControlAlgorithm { /// and delay can be configured via a user-specified parameter. /// (Experimental) Copa, + + /// Dummy is a simple congestion controller with a static congestion window. + /// It is intended to be used for testing and experiments. + Dummy, } impl FromStr for CongestionControlAlgorithm { @@ -75,6 +80,8 @@ impl FromStr for CongestionControlAlgorithm { Ok(CongestionControlAlgorithm::Bbr3) } else if algor.eq_ignore_ascii_case("copa") { Ok(CongestionControlAlgorithm::Copa) + } else if algor.eq_ignore_ascii_case("dummy") { + Ok(CongestionControlAlgorithm::Dummy) } else { Err(Error::InvalidConfig("unknown".into())) } @@ -208,6 +215,7 @@ pub fn build_congestion_controller(conf: &RecoveryConfig) -> Box Box::new(Dummy::new(initial_cwnd)), } } @@ -235,6 +243,9 @@ mod tests { ("copa", Ok(CongestionControlAlgorithm::Copa)), ("Copa", Ok(CongestionControlAlgorithm::Copa)), ("COPA", Ok(CongestionControlAlgorithm::Copa)), + ("dummy", Ok(CongestionControlAlgorithm::Dummy)), + ("Dummy", Ok(CongestionControlAlgorithm::Dummy)), + ("DUMMY", Ok(CongestionControlAlgorithm::Dummy)), ("cubci", Err(Error::InvalidConfig("unknown".into()))), ]; @@ -287,6 +298,7 @@ mod bbr3; mod copa; mod cubic; mod delivery_rate; +mod dummy; mod hystart_plus_plus; mod minmax; mod pacing; diff --git a/src/congestion_control/dummy.rs b/src/congestion_control/dummy.rs new file mode 100644 index 00000000..2bcdf461 --- /dev/null +++ b/src/congestion_control/dummy.rs @@ -0,0 +1,179 @@ +// Copyright (c) 2024 The TQUIC Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![allow(unused_variables)] + +use std::time::Instant; + +use super::CongestionController; +use super::CongestionStats; +use crate::connection::rtt::RttEstimator; +use crate::connection::space::SentPacket; + +/// Dummy is a simple congestion controller with a static congestion window. +/// It is intended to be used for testing and experiments. +#[derive(Debug)] +pub struct Dummy { + /// Congestion window in bytes. + cwnd: u64, + + /// Congestion statistics. + stats: CongestionStats, +} + +impl Dummy { + pub fn new(initial_cwnd: u64) -> Self { + Self { + cwnd: initial_cwnd, + stats: Default::default(), + } + } +} + +impl CongestionController for Dummy { + fn name(&self) -> &str { + "DUMMY" + } + + fn on_sent(&mut self, now: Instant, packet: &mut SentPacket, bytes_in_flight: u64) { + let sent_bytes = packet.sent_size as u64; + self.stats.bytes_in_flight = bytes_in_flight; + self.stats.bytes_sent_in_total = self.stats.bytes_sent_in_total.saturating_add(sent_bytes); + } + + fn begin_ack(&mut self, now: Instant, bytes_in_flight: u64) { + // Do nothing. + } + + fn on_ack( + &mut self, + packet: &mut SentPacket, + now: Instant, + app_limited: bool, + rtt: &RttEstimator, + bytes_in_flight: u64, + ) { + let acked_bytes = packet.sent_size as u64; + self.stats.bytes_in_flight = bytes_in_flight; + self.stats.bytes_acked_in_total = + self.stats.bytes_acked_in_total.saturating_add(acked_bytes); + } + + fn end_ack(&mut self) { + // Do nothing. + } + + fn on_congestion_event( + &mut self, + now: Instant, + packet: &SentPacket, + is_persistent_congestion: bool, + lost_bytes: u64, + bytes_in_flight: u64, + ) { + self.stats.bytes_lost_in_total = self.stats.bytes_lost_in_total.saturating_add(lost_bytes); + self.stats.bytes_in_flight = bytes_in_flight; + } + + fn in_slow_start(&self) -> bool { + false + } + + fn in_recovery(&self, sent_time: Instant) -> bool { + false + } + + fn congestion_window(&self) -> u64 { + self.cwnd + } + + fn initial_window(&self) -> u64 { + self.cwnd + } + + fn minimal_window(&self) -> u64 { + self.cwnd + } + + fn stats(&self) -> &CongestionStats { + &self.stats + } + + fn pacing_rate(&self) -> Option { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn dummy_init() { + let d = Dummy::new(1200 * 10); + assert_eq!(d.name(), "DUMMY"); + assert_eq!(d.congestion_window(), 1200 * 10); + assert_eq!(d.initial_window(), 1200 * 10); + assert_eq!(d.minimal_window(), 1200 * 10); + + assert_eq!(d.in_slow_start(), false); + assert_eq!(d.in_recovery(Instant::now()), false); + assert_eq!(d.stats().bytes_in_flight, 0); + assert_eq!(d.pacing_rate(), None); + } + + #[test] + fn dummy_stats() { + let mut d = Dummy::new(1200 * 10); + let rtt = Duration::from_millis(100); + let rtt_estimator = RttEstimator::new(rtt); + let now = Instant::now(); + + // Sent and acked a packet + let mut pkt = SentPacket { + pkt_num: 0, + ack_eliciting: true, + in_flight: true, + sent_size: 1200, + ..SentPacket::default() + }; + d.on_sent(now, &mut pkt, 1200); + assert_eq!(d.stats().bytes_in_flight, 1200); + assert_eq!(d.stats().bytes_sent_in_total, 1200); + + let now = now + rtt; + d.begin_ack(now, 1200); + d.on_ack(&mut pkt, now, true, &rtt_estimator, 0); + d.end_ack(); + assert_eq!(d.stats().bytes_in_flight, 0); + assert_eq!(d.stats().bytes_acked_in_total, 1200); + + // Sent and lost a packet + let mut pkt = SentPacket { + pkt_num: 0, + ack_eliciting: true, + in_flight: true, + sent_size: 1400, + ..SentPacket::default() + }; + d.on_sent(now, &mut pkt, 1400); + assert_eq!(d.stats().bytes_in_flight, 1400); + assert_eq!(d.stats().bytes_sent_in_total, 2600); + + d.on_congestion_event(now, &pkt, false, 1400, 0); + assert_eq!(d.stats().bytes_in_flight, 0); + assert_eq!(d.stats().bytes_lost_in_total, 1400); + } +} diff --git a/src/connection/flowcontrol.rs b/src/connection/flowcontrol.rs index 0efd8f7b..0739bc9e 100644 --- a/src/connection/flowcontrol.rs +++ b/src/connection/flowcontrol.rs @@ -15,6 +15,23 @@ use std::time::Duration; use std::time::Instant; +/// A flow control implementation that allows the size of the receive buffer to +/// be auto-tuned. +/// +/// The basic idea is to start with relatively small initial window size, and +/// then grow the window as necessary. For simplicity, auto-tuning may increase +/// the window size, but never decreases (contrast with congestion control). +/// +/// The ideal size of the window is one that is large enough that it can +/// encompass the bandwidth delay product (BDP) to the peer. +/// +/// The algorithm will compare the interval between successive flow control +/// window updates to the smoothed RTT. If the flow control window is too small +/// to keep up with the BDP, there will be a window update each RTT. +/// Alternatively, when the window is sized to the ideal, window updates can be +/// expected to occur with frequency corresponding to more than the 1 RTT +/// indicative of blocking, but not too much more. The default target chosen for +/// auto-tuning corresponds to 2 RTTs. #[derive(Default, Debug)] pub struct FlowControl { /// Number of bytes consumed (cumulative). @@ -47,9 +64,9 @@ pub struct FlowControl { } impl FlowControl { - pub fn new(max_data: u64, window: u64, max_window: u64) -> FlowControl { + pub fn new(window: u64, max_window: u64) -> FlowControl { FlowControl { - max_data, + max_data: window, window, max_window, ..FlowControl::default() @@ -86,7 +103,7 @@ impl FlowControl { /// Return true if the available window is smaller than the half /// of the current window. pub fn should_send_max_data(&self) -> bool { - (self.max_data - self.read_off) < (self.window / 2) + (self.max_data - self.read_off) * 2 < self.window } /// Get the next max_data limit which will be sent to the peer @@ -102,7 +119,7 @@ impl FlowControl { } /// Adjust the window size automatically. If the last update - /// is within 2 * srtt, increase the window size by 1.5, but + /// is within 2 * srtt, increase the window size by 2, but /// not exceeding the max_window. pub fn autotune_window(&mut self, now: Instant, srtt: Duration) { if let Some(last_updated) = self.last_updated { @@ -125,10 +142,10 @@ mod tests { #[test] fn fc_new() { - let flow_control = FlowControl::new(100, 10, 200); + let flow_control = FlowControl::new(100, 200); assert_eq!(flow_control.max_data(), 100); - assert_eq!(flow_control.window(), 10); + assert_eq!(flow_control.window(), 100); assert_eq!(flow_control.max_window, 200); assert_eq!(flow_control.read_off, 0); assert_eq!(flow_control.recv_off, 0); @@ -137,7 +154,7 @@ mod tests { #[test] fn fc_increase_recv_off() { - let mut fc = FlowControl::new(100, 10, 200); + let mut fc = FlowControl::new(100, 200); for (delta, total) in [(10, 10), (20, 30), (30, 60)] { fc.increase_recv_off(delta); @@ -147,19 +164,19 @@ mod tests { #[test] fn fc_update_logic() { - let mut fc = FlowControl::new(100, 10, 200); + let mut fc = FlowControl::new(100, 200); for (read_delta, read_off, should_send, max_data_next) in [ // 1. Initial state - (0, 0, false, 10), - // 2. Read 95 bytes - // available window is 5 == window / 2, not need to send max_data, - // max_data_next is 105 = read_off(95) + window(10) - (95, 95, false, 105), + (0, 0, false, 100), + // 2. Read 50 bytes + // available window is 50 == window / 2, not need to send max_data, + // max_data_next is 150 = read_off(50) + window(100) + (50, 50, false, 150), // 3. Read 1 bytes - // available window is 4 < window / 2, need to send max_data - // max_data_next is 106 = read_off(96) + window(10) - (1, 96, true, 106), + // available window is 49 < window / 2, need to send max_data + // max_data_next is 151 = read_off(51) + window(100) + (1, 51, true, 151), ] { fc.increase_read_off(read_delta); assert_eq!(fc.read_off, read_off); @@ -168,7 +185,7 @@ mod tests { } fc.update_max_data(Instant::now()); - assert_eq!(fc.max_data(), 106); + assert_eq!(fc.max_data(), 151); } #[test] @@ -177,18 +194,18 @@ mod tests { let max_window = 30; let now = Instant::now(); let srtt = Duration::from_millis(100); - let mut fc = FlowControl::new(100, window, max_window); + let mut fc = FlowControl::new(window, max_window); - // 1. Read 96 bytes, available window is 4 < window / 2, need to send max_data. - let read_off = 96; + // 1. Read 6 bytes, available window is 4 < window / 2, need to send max_data. + let read_off = 6; fc.increase_read_off(read_off); assert_eq!(fc.should_send_max_data(), true); - // max_data_next = read_off(96) + window(10) = 106 + // max_data_next = read_off(6) + window(10) = 16 let max_data_next = fc.max_data_next(); assert_eq!(max_data_next, read_off + fc.window); - // 2. Apply the new max_data limit(106), last_updated is set to now. + // 2. Apply the new max_data limit(16), last_updated is set to now. fc.update_max_data(now); assert_eq!(fc.max_data(), max_data_next); @@ -197,16 +214,16 @@ mod tests { // Window auto-tuned to 20 assert_eq!(fc.window, window * 2); - // 4. Read 1 byte, available window is 9 < window / 2, need to send max_data. - let read_off_delta = 1; + // 4. Read 5 byte, available window is 9 < window / 2, need to send max_data. + let read_off_delta = 5; fc.increase_read_off(read_off_delta); assert_eq!(fc.should_send_max_data(), true); - // max_data_next = read_off(97) + window(20) = 117 + // max_data_next = read_off(11) + window(20) = 31 let max_data_next = fc.max_data_next(); assert_eq!(max_data_next, read_off + read_off_delta + fc.window); - // 5. Apply the new max_data limit(117), last_updated is set to now. + // 5. Apply the new max_data limit(31), last_updated is set to now. fc.update_max_data(now); assert_eq!(fc.max_data(), max_data_next); @@ -219,8 +236,7 @@ mod tests { #[test] fn fc_ensure_window_lower_bound() { - let min_window = 10; - let mut fc = FlowControl::new(100, 10, 200); + let mut fc = FlowControl::new(10, 200); for (min_window, window) in [ // min_window < window, unchanged diff --git a/src/connection/stream.rs b/src/connection/stream.rs index 6c81efad..ed8bb69c 100644 --- a/src/connection/stream.rs +++ b/src/connection/stream.rs @@ -170,10 +170,6 @@ impl StreamMap { flow_control: flowcontrol::FlowControl::new( local_params.initial_max_data, - cmp::min( - local_params.initial_max_data / 2 * 3, - DEFAULT_CONNECTION_WINDOW, - ), max_connection_window, ), @@ -2026,11 +2022,7 @@ impl RecvBuf { /// Create a new receive-side stream buffer with given flow control limits. fn new(max_data: u64, max_window: u64) -> RecvBuf { RecvBuf { - flow_control: flowcontrol::FlowControl::new( - max_data, - cmp::min(max_data, DEFAULT_STREAM_WINDOW), - max_window, - ), + flow_control: flowcontrol::FlowControl::new(max_data, max_window), ..RecvBuf::default() } } @@ -3739,10 +3731,10 @@ mod tests { assert!(map.readable.contains(&0)); assert!(map.stream_shutdown(0, Shutdown::Read, 10).is_ok()); - // init_max_data: 14, window: 21, read_off: 10 + // init_max_data: 14, window: 14, read_off: 10 // available_window: 4 < window / 2, should update max_data. assert_eq!(map.flow_control.max_data(), 14); - assert_eq!(map.flow_control.max_data_next(), 31); + assert_eq!(map.flow_control.max_data_next(), 24); assert!(map.flow_control.should_send_max_data()); assert!(map.rx_almost_full); } @@ -3888,7 +3880,7 @@ mod tests { assert_eq!(map.concurrency_control, ConcurrencyControl::new(5, 5)); // Check connection-level flow control - assert_eq!(map.flow_control.window(), 150); + assert_eq!(map.flow_control.window(), 100); assert_eq!(map.flow_control.max_data(), 100); assert!( !map.flow_control.should_send_max_data(), @@ -4959,8 +4951,7 @@ mod tests { }; let mut map = StreamMap::new(true, 50, 50, local_tp); - // init window = initial_max_data /2 * 3 - assert_eq!(map.flow_control.window(), 30); + assert_eq!(map.flow_control.window(), 20); assert_eq!(map.flow_control.max_data(), 20); // 1. Receive a RESET_STREAM frame for a stream which has received some data @@ -4973,10 +4964,10 @@ mod tests { ); assert_eq!(map.on_reset_stream_frame_received(4, 0, 4), Ok(())); // map.flow_control.consumed = 4 - assert_eq!(map.flow_control.max_data_next(), 34); + assert_eq!(map.flow_control.max_data_next(), 24); assert!( !map.flow_control.should_send_max_data(), - "available_window = 16 > 15 = window/2, not update max_data" + "available_window = 16 > 10 = window/2, not update max_data" ); assert!(!map.rx_almost_full); @@ -4985,15 +4976,15 @@ mod tests { // stream_id: 8, max received offset: 1, final size: 2. let stream = map.get_or_create(8, false).unwrap(); assert_eq!( - stream.recv.write(0, Bytes::from_static(b"O"), false), + stream.recv.write(0, Bytes::from_static(b"QUICQUIC"), false), Ok(()) ); - assert_eq!(map.on_reset_stream_frame_received(8, 0, 2), Ok(())); - // map.flow_control.consumed = 6 - assert_eq!(map.flow_control.max_data_next(), 36); + assert_eq!(map.on_reset_stream_frame_received(8, 0, 8), Ok(())); + // map.flow_control.consumed = 12 + assert_eq!(map.flow_control.max_data_next(), 32); assert!( map.flow_control.should_send_max_data(), - "available_window = 14 < 15 = window/2, update max_data" + "available_window = 8 < 10 = window/2, update max_data" ); assert!(map.rx_almost_full); } @@ -5334,8 +5325,7 @@ mod tests { initial_max_streams_uni: 5, }; let mut map = StreamMap::new(true, 50, 50, local_tp); - // init window = initial_max_data /2 * 3 - assert_eq!(map.flow_control.window(), 30); + assert_eq!(map.flow_control.window(), 20); assert_eq!(map.flow_control.max_data(), 20); // Create stream 4 @@ -5351,19 +5341,19 @@ mod tests { // map.flow_control.consumed = 4 assert!( !map.flow_control.should_send_max_data(), - "available_window = 16 > 15 = window/2, not update max_data" + "available_window = 16 > 10 = window/2, not update max_data" ); assert!(!map.rx_almost_full); // Receive the second block of data of stream 4, should update max_data assert_eq!( - map.on_stream_frame_received(4, 4, 2, false, Bytes::from_static(b"GO")), + map.on_stream_frame_received(4, 4, 8, false, Bytes::from_static(b"QUICQUIC")), Ok(()) ); - // map.flow_control.consumed = 6 + // map.flow_control.consumed = 12 assert!( map.flow_control.should_send_max_data(), - "available_window = 14 < 15 = window/2, update max_data" + "available_window = 8 < 10 = window/2, update max_data" ); assert!(map.rx_almost_full); } diff --git a/src/endpoint.rs b/src/endpoint.rs index e04fd79a..dbe4635d 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -88,6 +88,10 @@ pub struct Endpoint { /// Used to send packet out. sender: Rc, + /// Buffer for ZeroRTT packets that arrive before Initial packets due to + /// potential misordering or loss of Initial packets. + buffer: PacketBuffer, + /// Packets generated by the endpoint. packets: PacketQueue, @@ -111,6 +115,7 @@ impl Endpoint { cid_lifetime: None, }); let trace_id = if is_server { "SERVER" } else { "CLIENT" }; + let buffer = PacketBuffer::new(config.zerortt_buffer_size); let packets = PacketQueue::new(config.send_batch_size); Self { @@ -123,6 +128,7 @@ impl Endpoint { cid_gen, handler, sender, + buffer, packets, closed: false, trace_id: trace_id.to_string(), @@ -224,8 +230,10 @@ impl Endpoint { } // Drop the datagram for unrecognized connection for client - if !self.is_server && self.config.stateless_reset { - self.send_stateless_reset(buf.len(), &hdr.dcid, local, remote)?; + if !self.is_server { + if self.config.stateless_reset { + self.send_stateless_reset(buf.len(), &hdr.dcid, local, remote)?; + } return Ok(()); } @@ -283,17 +291,35 @@ impl Endpoint { conn.set_queues(self.queues.clone()); trace!( "{} create a server connection {:?}", + &self.trace_id, conn.trace_id(), - &self.trace_id ); self.handler.on_conn_created(conn); conn.mark_tickable(true); conn.recv(buf, info).map(|_| ())?; + + // Check and delivery buffered ZeroRTT Packets to the conn. + if let Some(mut v) = self.buffer.del(&hdr.dcid) { + trace!( + "{} try to delivery {} buffered zerortt packets to connection {:?}", + &self.trace_id, + v.len(), + conn.trace_id(), + ); + for (ref mut buf, info) in v.iter_mut() { + conn.recv(buf, info).map(|_| ())?; + } + } } return Ok(()); } + // Try to buffer ZeroRTT packets for the unknown connection on the server + if hdr.pkt_type == PacketType::ZeroRTT && !self.closed { + self.buffer.add(hdr.dcid, buf.to_vec(), *info); + } + // Send the Stateless Reset packet for the unknown connection if hdr.pkt_type == PacketType::OneRTT && !hdr.dcid.is_empty() && self.config.stateless_reset { @@ -962,6 +988,41 @@ impl ConnectionRoutes { } } +const MAX_ZERORTT_PACKETS_PER_CONN: usize = 10; + +/// PacketBuffer is used for buffering early incoming ZeroRTT packets on the server. +/// Buffered packets are indexed by odcid. +struct PacketBuffer { + packets: hashlru::Cache, PacketInfo)>>, +} + +impl PacketBuffer { + fn new(cache_size: usize) -> Self { + Self { + packets: hashlru::Cache::new(cache_size / MAX_ZERORTT_PACKETS_PER_CONN), + } + } + + /// Buffer a ZeroRTT packet for the given connection + fn add(&mut self, dcid: ConnectionId, buffer: Vec, info: PacketInfo) { + if let Some(v) = self.packets.get_mut(&dcid) { + if v.len() < MAX_ZERORTT_PACKETS_PER_CONN { + v.push((buffer, info)); + } + return; + } + + let mut v = Vec::with_capacity(MAX_ZERORTT_PACKETS_PER_CONN); + v.push((buffer, info)); + self.packets.insert(dcid, v); + } + + /// Remove all packets for the specified connection + fn del(&mut self, dcid: &ConnectionId) -> Option, PacketInfo)>> { + self.packets.remove(dcid) + } +} + const MAX_BUFFER_SIZE: usize = 2048; /// PacketQueue is used for sending out packets in batches. @@ -1075,6 +1136,22 @@ mod tests { /// Run client/server endpoint in two threads. fn run(&mut self, cli_conf: Config, srv_conf: Config, case_conf: CaseConf) -> Result<()> { + self.run_with_packet_filter( + cli_conf, + srv_conf, + case_conf, + Box::new(NoopPacketFilter {}), + ) + } + + /// Run client/server endpoint in two threads. + fn run_with_packet_filter( + &mut self, + cli_conf: Config, + srv_conf: Config, + case_conf: CaseConf, + cli_filter: Box, + ) -> Result<()> { // Exit if client/server thread panic std::panic::set_hook(Box::new(|panic_info| { println!( @@ -1087,8 +1164,9 @@ mod tests { // Prepare client/server sockets and config let mut cli_poll = mio::Poll::new().unwrap(); - let cli_sock = + let mut cli_sock = TestSocket::new(cli_poll.registry(), &case_conf, "CLIENT".into()).unwrap(); + cli_sock.set_filter(cli_filter); let cli_addr = cli_sock.local_addr()?; let cli_case_conf = case_conf.clone(); let cli_stop = Arc::clone(&self.stop); @@ -1309,13 +1387,19 @@ mod tests { } } + struct TestSocketState { + /// Rng for testing purposes. + rng: StepRng, + + /// Packet filter for outgoing packets + filter: Option>, + } + /// UdpSocket with fault injection. struct TestSocket { socket: mio::net::UdpSocket, - /// Rng for testing purposes. - /// TODO: use custom deterministic rng - rng: RefCell, + state: RefCell, /// Used for simulating packet loss (0~100) packet_loss: u32, @@ -1346,11 +1430,14 @@ mod tests { reg.register(&mut socket, TOKEN, mio::Interest::READABLE) .unwrap(); - let rng = RefCell::new(StepRng::new(0, 1)); + let state = RefCell::new(TestSocketState { + rng: StepRng::new(0, 1), + filter: None, + }); Ok(Self { socket, - rng, + state, packet_loss: conf.packet_loss, packet_delay: conf.packet_delay, packet_reorder: conf.packet_reorder, @@ -1360,6 +1447,11 @@ mod tests { }) } + /// Set the customized packter filter + fn set_filter(&mut self, filter: Box) { + self.state.borrow_mut().filter = Some(filter); + } + /// Return the local socket address. fn local_addr(&self) -> std::io::Result { self.socket.local_addr() @@ -1367,7 +1459,7 @@ mod tests { /// Whether an abnormal event should be injected. fn sampling(&self, rate: u32) -> bool { - self.rng.borrow_mut().next_u32() % 100 < rate + self.state.borrow_mut().rng.next_u32() % 100 < rate } /// Filter packets which are delayed long enough. @@ -1416,7 +1508,7 @@ mod tests { while start < pkts.len() { let end = cmp::min(start + window, pkts.len()); let range = &mut pkts[start..end]; - range.shuffle(&mut *self.rng.borrow_mut()); + range.shuffle(&mut self.state.borrow_mut().rng); start = end; } trace!( @@ -1479,8 +1571,12 @@ mod tests { fn on_packets_send(&self, pkts: &[(Vec, PacketInfo)]) -> crate::Result { let mut count = 0; - // Simulate event of packet delay let mut pkts = pkts.to_vec(); + if let Some(ref mut f) = &mut self.state.borrow_mut().filter { + f.filter(&mut pkts); + } + + // Simulate event of packet delay pkts = self.try_delay_packets(pkts); if pkts.is_empty() { return Ok(0); @@ -1519,6 +1615,43 @@ mod tests { } } + trait PacketFilter { + fn filter(&mut self, pkts: &mut Vec<(Vec, PacketInfo)>); + } + + struct NoopPacketFilter {} + + impl PacketFilter for NoopPacketFilter { + fn filter(&mut self, _pkts: &mut Vec<(Vec, PacketInfo)>) {} + } + + struct FirstPacketFilter { + count: u64, + drop_or_disorder: bool, + } + + impl FirstPacketFilter { + fn new(drop_or_disorder: bool) -> Self { + Self { + count: 0, + drop_or_disorder, + } + } + } + + impl PacketFilter for FirstPacketFilter { + fn filter(&mut self, pkts: &mut Vec<(Vec, PacketInfo)>) { + if self.count == 0 { + if self.drop_or_disorder && pkts.len() >= 1 { + pkts.remove(0); + } else if pkts.len() >= 2 { + pkts.swap(0, 1); + } + } + self.count += pkts.len() as u64; + } + } + // A mocked socket which implements PacketSendHandler. struct MockSocket { packets: RefCell, PacketInfo)>>, @@ -2297,6 +2430,31 @@ mod tests { Ok(()) } + #[test] + fn endpoint_client_recv_invalid_initial() -> Result<()> { + let sock = Rc::new(MockSocket::new()); + let mut conf = TestPair::new_test_config(false)?; + conf.enable_stateless_reset(false); + + let mut e = Endpoint::new( + Box::new(conf), + false, + Box::new(ClientHandler::new( + CaseConf::default(), + Arc::new(AtomicBool::new(false)), + )), + sock.clone(), + ); + let info = TestTool::new_test_packet_info(false); + + // Client recv an Initial with ClientHello message + let mut initial = TEST_INITIAL.clone(); + e.recv(&mut initial, &info)?; + assert_eq!(e.conns.len(), 0); + + Ok(()) + } + #[test] fn endpoint_conn_raw_pointer_stability() -> Result<()> { let cli_addr: SocketAddr = "127.8.8.8:8888".parse().unwrap(); @@ -2365,6 +2523,46 @@ mod tests { Ok(()) } + #[test] + fn transfer_single_stream_0rtt_with_initial_lost() -> Result<()> { + let mut t = TestPair::new(); + + let cli_conf = TestPair::new_test_config(false)?; + let srv_conf = TestPair::new_test_config(true)?; + + let mut case_conf = CaseConf::default(); + case_conf.session = Some(TestPair::new_test_session_state()); + case_conf.client_0rtt_expected = true; + case_conf.resumption_expected = true; + case_conf.request_num = 1; + case_conf.request_size = 1024 * 16; + + // Drop the first packet(Initial) sent by the client + let filter = Box::new(FirstPacketFilter::new(true)); + t.run_with_packet_filter(cli_conf, srv_conf, case_conf, filter)?; + Ok(()) + } + + #[test] + fn transfer_single_stream_0rtt_with_initial_disordered() -> Result<()> { + let mut t = TestPair::new(); + + let cli_conf = TestPair::new_test_config(false)?; + let srv_conf = TestPair::new_test_config(true)?; + + let mut case_conf = CaseConf::default(); + case_conf.session = Some(TestPair::new_test_session_state()); + case_conf.client_0rtt_expected = true; + case_conf.resumption_expected = true; + case_conf.request_num = 1; + case_conf.request_size = 1024 * 16; + + // Disorder the first packet(Initial) sent by the client + let filter = Box::new(FirstPacketFilter::new(false)); + t.run_with_packet_filter(cli_conf, srv_conf, case_conf, filter)?; + Ok(()) + } + #[test] fn transfer_single_stream_0rtt_reject() -> Result<()> { let mut t = TestPair::new(); @@ -2447,6 +2645,20 @@ mod tests { Ok(()) } + #[test] + fn transfer_single_stream_dummy_with_packet_loss() -> Result<()> { + let mut t = TestPair::new(); + + let mut case_conf = CaseConf::default(); + case_conf.request_num = 1; + case_conf.request_size = 1024 * 16; + case_conf.packet_loss = 1; + case_conf.cc_algor = CongestionControlAlgorithm::Dummy; + + t.run_with_test_config(case_conf)?; + Ok(()) + } + #[test] fn transfer_single_stream_with_packet_delay() -> Result<()> { let mut t = TestPair::new(); diff --git a/src/ffi.rs b/src/ffi.rs index e943bfba..22f31a35 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -165,24 +165,32 @@ pub extern "C" fn quic_config_set_send_udp_payload_size(config: &mut Config, v: /// Set the `initial_max_data` transport parameter. It means the initial /// value for the maximum amount of data that can be sent on the connection. +/// The value is capped by the setting `max_connection_window`. +/// The default value is `10485760`. #[no_mangle] pub extern "C" fn quic_config_set_initial_max_data(config: &mut Config, v: u64) { config.set_initial_max_data(v); } /// Set the `initial_max_stream_data_bidi_local` transport parameter. +/// The value is capped by the setting `max_stream_window`. +/// The default value is `5242880`. #[no_mangle] pub extern "C" fn quic_config_set_initial_max_stream_data_bidi_local(config: &mut Config, v: u64) { config.set_initial_max_stream_data_bidi_local(v); } /// Set the `initial_max_stream_data_bidi_remote` transport parameter. +/// The value is capped by the setting `max_stream_window`. +/// The default value is `2097152`. #[no_mangle] pub extern "C" fn quic_config_set_initial_max_stream_data_bidi_remote(config: &mut Config, v: u64) { config.set_initial_max_stream_data_bidi_remote(v); } /// Set the `initial_max_stream_data_uni` transport parameter. +/// The value is capped by the setting `max_stream_window`. +/// The default value is `1048576`. #[no_mangle] pub extern "C" fn quic_config_set_initial_max_stream_data_uni(config: &mut Config, v: u64) { config.set_initial_max_stream_data_uni(v); @@ -283,12 +291,15 @@ pub extern "C" fn quic_config_set_multipath_algorithm(config: &mut Config, v: Mu } /// Set the maximum size of the connection flow control window. +/// The default value is MAX_CONNECTION_WINDOW (15 MB). #[no_mangle] pub extern "C" fn quic_config_set_max_connection_window(config: &mut Config, v: u64) { config.set_max_connection_window(v); } /// Set the maximum size of the stream flow control window. +/// The value should not be greater than the setting `max_connection_window`. +/// The default value is MAX_STREAM_WINDOW (6 MB). #[no_mangle] pub extern "C" fn quic_config_set_max_stream_window(config: &mut Config, v: u64) { config.set_max_stream_window(v); @@ -387,6 +398,13 @@ pub extern "C" fn quic_config_set_send_batch_size(config: &mut Config, v: u16) { config.set_send_batch_size(v as usize); } +/// Set the buffer size for disordered zerortt packets on the server. +/// Applicable to Server only. +#[no_mangle] +pub extern "C" fn quic_config_set_zerortt_buffer_size(config: &mut Config, v: u16) { + config.set_zerortt_buffer_size(v as usize); +} + /// Create a new TlsConfig. /// The caller is responsible for the memory of the TlsConfig and should properly /// destroy it by calling `quic_tls_config_free`. diff --git a/src/h3/connection.rs b/src/h3/connection.rs index 34071e4f..a5b182dd 100644 --- a/src/h3/connection.rs +++ b/src/h3/connection.rs @@ -3309,6 +3309,9 @@ mod tests { assert_eq!(s.server_poll(), Ok((stream_id, headers_event))); assert_eq!(s.server_poll(), Ok((stream_id, Http3Event::Finished))); assert_eq!(s.server_poll(), Err(Http3Error::Done)); + + // Server send MAX_DATA + s.move_forward().unwrap(); } // 4. Server send response headers with FIN for stream 0, 4, 8. diff --git a/src/lib.rs b/src/lib.rs index 2f5deb2f..c92adbce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -350,6 +350,9 @@ pub struct Config { /// Maximum numbers of packets sent in a batch. send_batch_size: usize, + /// Buffer size for early incoming zero rtt packets, in packets. + zerortt_buffer_size: usize, + /// Configurations about loss recovery, congestion control, and pmtu discovery. recovery: RecoveryConfig, @@ -403,6 +406,7 @@ impl Config { cid_len: 8, anti_amplification_factor: ANTI_AMPLIFICATION_FACTOR, send_batch_size: 64, + zerortt_buffer_size: 1000, recovery: RecoveryConfig::default(), multipath: MultipathConfig::default(), tls_config_selector: None, @@ -442,29 +446,34 @@ impl Config { /// Set the `initial_max_data` transport parameter. It means the initial /// value for the maximum amount of data that can be sent on the connection. + /// The value is capped by the setting `max_connection_window`. /// The default value is `10485760`. pub fn set_initial_max_data(&mut self, v: u64) { - self.local_transport_params.initial_max_data = cmp::min(v, VINT_MAX); + self.local_transport_params.initial_max_data = cmp::min(v, self.max_connection_window); } /// Set the `initial_max_stream_data_bidi_local` transport parameter. + /// The value is capped by the setting `max_stream_window`. /// The default value is `5242880`. pub fn set_initial_max_stream_data_bidi_local(&mut self, v: u64) { self.local_transport_params - .initial_max_stream_data_bidi_local = cmp::min(v, VINT_MAX); + .initial_max_stream_data_bidi_local = cmp::min(v, self.max_stream_window); } /// Set the `initial_max_stream_data_bidi_remote` transport parameter. + /// The value is capped by the setting `max_stream_window`. /// The default value is `2097152`. pub fn set_initial_max_stream_data_bidi_remote(&mut self, v: u64) { self.local_transport_params - .initial_max_stream_data_bidi_remote = cmp::min(v, VINT_MAX); + .initial_max_stream_data_bidi_remote = cmp::min(v, self.max_stream_window); } /// Set the `initial_max_stream_data_uni` transport parameter. + /// The value is capped by the setting `max_stream_window`. /// The default value is `1048576`. pub fn set_initial_max_stream_data_uni(&mut self, v: u64) { - self.local_transport_params.initial_max_stream_data_uni = cmp::min(v, VINT_MAX); + self.local_transport_params.initial_max_stream_data_uni = + cmp::min(v, self.max_stream_window); } /// Set the `initial_max_streams_bidi` transport parameter. @@ -555,15 +564,16 @@ impl Config { } /// Set the maximum size of the connection flow control window. - /// The default value is MAX_CONNECTION_WINDOW. + /// The default value is MAX_CONNECTION_WINDOW (15 MB). pub fn set_max_connection_window(&mut self, v: u64) { - self.max_connection_window = v; + self.max_connection_window = cmp::min(v, VINT_MAX); } /// Set the maximum size of the stream flow control window. - /// The default value is MAX_STREAM_WINDOW. + /// The value should not be greater than the setting `max_connection_window`. + /// The default value is MAX_STREAM_WINDOW (6 MB). pub fn set_max_stream_window(&mut self, v: u64) { - self.max_stream_window = v; + self.max_stream_window = cmp::min(v, VINT_MAX); } /// Set the maximum number of concurrent connections. @@ -636,6 +646,12 @@ impl Config { self.send_batch_size = cmp::max(v, 1); } + /// Set the buffer size for disordered zerortt packets on the server. + /// Applicable to Server only. + pub fn set_zerortt_buffer_size(&mut self, v: usize) { + self.zerortt_buffer_size = v; + } + /// Set TLS config. pub fn set_tls_config(&mut self, tls_config: tls::TlsConfig) { self.set_tls_config_selector(Arc::new(tls::DefaultTlsConfigSelector { diff --git a/tools/Cargo.toml b/tools/Cargo.toml index 1c9f3d35..3949ef4f 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tquic_tools" -version = "0.11.0" +version = "0.12.0" edition = "2021" rust-version = "1.70.0" license = "Apache-2.0" @@ -23,7 +23,7 @@ rand = "0.8.5" statrs = "0.16" jemallocator = { version = "0.5", package = "tikv-jemallocator" } signal-hook = "0.3.17" -tquic = { path = "..", version = "0.11.0"} +tquic = { path = "..", version = "0.12.0"} [lib] crate-type = ["lib"] diff --git a/tools/src/bin/tquic_client.rs b/tools/src/bin/tquic_client.rs index 950c3294..bfcef5b2 100644 --- a/tools/src/bin/tquic_client.rs +++ b/tools/src/bin/tquic_client.rs @@ -14,7 +14,6 @@ use std::cell::RefCell; use std::cell::RefMut; -use std::cmp; use std::cmp::max; use std::fs::create_dir_all; use std::fs::File; @@ -66,7 +65,6 @@ use tquic::MultipathAlgorithm; use tquic::PacketInfo; use tquic::TlsConfig; use tquic::TransportHandler; -use tquic::TIMER_GRANULARITY; use tquic_tools::ApplicationProto; use tquic_tools::QuicSocket; use tquic_tools::Result; @@ -75,7 +73,7 @@ use tquic_tools::Result; static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; #[derive(Parser, Debug, Clone)] -#[clap(name = "client")] +#[clap(name = "client", version=env!("CARGO_PKG_VERSION"))] pub struct ClientOpt { /// Server's address. #[clap(short, long, value_name = "ADDR")] @@ -611,18 +609,7 @@ impl Worker { break; } - let timeout = self - .endpoint - .timeout() - .map(|v| cmp::max(v, TIMER_GRANULARITY)); - - self.poll.poll(&mut events, timeout)?; - - // Process timeout events - if events.is_empty() { - self.endpoint.on_timeout(Instant::now()); - continue; - } + self.poll.poll(&mut events, self.endpoint.timeout())?; // Process IO events for event in events.iter() { @@ -630,6 +617,11 @@ impl Worker { self.process_read_event(event)?; } } + + // Process timeout events. + // Note: Since `poll()` doesn't clearly tell if there was a timeout when it returns, + // it is up to the endpoint to check for a timeout and deal with it. + self.endpoint.on_timeout(Instant::now()); } self.finish(); @@ -1321,7 +1313,9 @@ impl WorkerHandler { let senders = self.senders.borrow_mut(); let sender = senders.get(&index); if let Some(s) = sender { - if s.request_done == s.option.max_requests_per_conn && !conn.is_closing() { + if s.request_done == s.option.max_requests_per_conn + && !(conn.is_closing() || conn.is_closed()) + { let mut worker_ctx = self.worker_ctx.borrow_mut(); worker_ctx.concurrent_conns -= 1; debug!( diff --git a/tools/src/bin/tquic_server.rs b/tools/src/bin/tquic_server.rs index 021b2aaf..f5f7beff 100644 --- a/tools/src/bin/tquic_server.rs +++ b/tools/src/bin/tquic_server.rs @@ -14,7 +14,6 @@ //! An QUIC server based on the high level endpoint API. -use std::cmp; use std::collections::HashMap; use std::fs::create_dir_all; use std::fs::File; @@ -43,7 +42,6 @@ use tquic::MultipathAlgorithm; use tquic::PacketInfo; use tquic::TlsConfig; use tquic::TransportHandler; -use tquic::TIMER_GRANULARITY; use tquic_tools::ApplicationProto; use tquic_tools::QuicSocket; use tquic_tools::Result; @@ -52,7 +50,7 @@ use tquic_tools::Result; static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; #[derive(Parser, Debug)] -#[clap(name = "server")] +#[clap(name = "server", version=env!("CARGO_PKG_VERSION"))] pub struct ServerOpt { /// Address to listen. #[clap(short, long, default_value = "0.0.0.0:4433", value_name = "ADDR")] @@ -236,6 +234,15 @@ pub struct ServerOpt { /// Batch size for sending packets. #[clap(long, default_value = "16", value_name = "NUM", help_heading = "Misc")] pub send_batch_size: usize, + + /// buffer size for disordered zerortt packets on the server. + #[clap( + long, + default_value = "1000", + value_name = "NUM", + help_heading = "Misc" + )] + pub zerortt_buffer_size: usize, } const MAX_BUF_SIZE: usize = 65536; @@ -271,6 +278,7 @@ impl Server { config.set_cid_len(option.cid_len); config.set_anti_amplification_factor(option.anti_amplification_factor); config.set_send_batch_size(option.send_batch_size); + config.set_zerortt_buffer_size(option.zerortt_buffer_size); config.set_congestion_control_algorithm(option.congestion_control_algor); config.set_initial_congestion_window(option.initial_congestion_window); config.set_min_congestion_window(option.min_congestion_window); @@ -891,10 +899,7 @@ fn main() -> Result<()> { error!("process connections error: {:?}", e); } - let timeout = server - .endpoint - .timeout() - .map(|v| cmp::max(v, TIMER_GRANULARITY)); + let timeout = server.endpoint.timeout(); debug!( "{} wait for io events, timeout: {:?}", server.endpoint.trace_id(), @@ -902,17 +907,16 @@ fn main() -> Result<()> { ); server.poll.poll(&mut events, timeout)?; - // Process timeout events - if events.is_empty() { - server.endpoint.on_timeout(Instant::now()); - continue; - } - // Process IO events for event in events.iter() { if event.is_readable() { server.process_read_event(event)?; } } + + // Process timeout events. + // Note: Since `poll()` doesn't clearly tell if there was a timeout when it returns, + // it is up to the endpoint to check for a timeout and deal with it. + server.endpoint.on_timeout(Instant::now()); } }