From dfe0dbbc70d796fde3c0d5e47bec06f6d41bfe19 Mon Sep 17 00:00:00 2001 From: Tomas Rigaux Date: Thu, 23 Oct 2025 19:11:04 -0400 Subject: [PATCH 1/6] add conference setup --- CMakeLists.txt | 35 +--- include/qperf.hpp | 8 +- include/qperf_pub.hpp | 23 +-- include/qperf_sub.hpp | 25 +-- scripts/run_parallel_subs.sh | 26 ++- src/qperf.cpp | 239 ++++++++++++++++++++++++++++ src/{qperf_pub.cc => qperf_pub.cpp} | 191 +--------------------- src/{qperf_sub.cc => qperf_sub.cpp} | 193 ++-------------------- templates/config-audio.ini | 4 +- 9 files changed, 296 insertions(+), 448 deletions(-) create mode 100644 src/qperf.cpp rename src/{qperf_pub.cc => qperf_pub.cpp} (68%) rename src/{qperf_sub.cc => qperf_sub.cpp} (68%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 674a741..6b9ff51 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,43 +20,22 @@ CPMAddPackage("gh:Quicr/libquicr#main") CPMAddPackage("gh:jarro2783/cxxopts@3.3.1") #=============================================================================# -# Build QPerf Publication executable +# Build QPerf executable #=============================================================================# -add_executable(qperf_pub src/qperf_pub.cc) -target_link_libraries(qperf_pub PRIVATE quicr cxxopts spdlog::spdlog) -target_include_directories(qperf_pub PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include) +add_executable(qperf src/qperf.cpp src/qperf_pub.cpp src/qperf_sub.cpp) +target_link_libraries(qperf PRIVATE quicr cxxopts spdlog::spdlog) +target_include_directories(qperf PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include) -target_compile_options(qperf_pub PRIVATE +target_compile_options(qperf PRIVATE $<$,$,$>: -Wpedantic -Wextra -Wall> $<$: > ) -set_target_properties(qperf_pub PROPERTIES +set_target_properties(qperf PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED YES CXX_EXTENSIONS OFF ) -target_compile_definitions(qperf_pub PRIVATE SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_DEBUG) - -#=============================================================================# -# Build QPerf Subscription executable -#=============================================================================# - -add_executable(qperf_sub src/qperf_sub.cc) -target_link_libraries(qperf_sub PRIVATE quicr cxxopts spdlog::spdlog) -target_include_directories(qperf_sub PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include) - -target_compile_options(qperf_sub PRIVATE - $<$,$,$>: -Wpedantic -Wextra -Wall> - $<$: > -) - -set_target_properties(qperf_sub PROPERTIES - CXX_STANDARD 20 - CXX_STANDARD_REQUIRED YES - CXX_EXTENSIONS OFF -) - -target_compile_definitions(qperf_sub PRIVATE SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_DEBUG) +target_compile_definitions(qperf PRIVATE SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_DEBUG) diff --git a/include/qperf.hpp b/include/qperf.hpp index af7cdc3..700e56b 100644 --- a/include/qperf.hpp +++ b/include/qperf.hpp @@ -65,7 +65,10 @@ namespace qperf { return { quicr::TrackNamespace{ track_namespace }, { track_name.begin(), track_name.end() } }; } - static bool PopulateScenarioFields(const std::string section_name, ini::IniFile& inif, PerfConfig& perf_config) + static bool PopulateScenarioFields(const std::string section_name, + std::uint32_t instance_id, + ini::IniFile& inif, + PerfConfig& perf_config) { bool parsed = false; std::string scenario_namespace = ""; @@ -73,7 +76,8 @@ namespace qperf { perf_config.test_name = section_name; - scenario_namespace = inif[section_name]["namespace"].as(); + scenario_namespace = + std::vformat(inif[section_name]["namespace"].as(), std::make_format_args(instance_id)); scenario_name = inif[section_name]["name"].as(); perf_config.full_track_name = MakeFullTrackName(scenario_namespace, scenario_name); diff --git a/include/qperf_pub.hpp b/include/qperf_pub.hpp index 222364d..e80efa8 100644 --- a/include/qperf_pub.hpp +++ b/include/qperf_pub.hpp @@ -14,7 +14,9 @@ namespace qperf { PerfPublishTrackHandler(const PerfConfig&); public: - static auto Create(const std::string& section_name, ini::IniFile& inif); + static std::shared_ptr Create(const std::string& section_name, + ini::IniFile& inif, + std::uint32_t instance_id); void StatusChanged(Status status) override; void MetricsSampled(const quicr::PublishTrackMetrics& metrics) override; @@ -43,23 +45,4 @@ namespace qperf { qperf::TestMetrics test_metrics_; std::mutex mutex_; }; - - class PerfPubClient : public quicr::Client - { - public: - PerfPubClient(const quicr::ClientConfig& cfg, const std::string& configfile); - void StatusChanged(Status status) override; - void MetricsSampled(const quicr::ConnectionMetrics&) override; - bool GetTerminateStatus(); - bool HandlersComplete(); - void Terminate(); - - private: - bool terminate_; - std::string configfile_; - ini::IniFile inif_; - std::vector> track_handlers_; - std::mutex track_handlers_mutex_; - }; - } // namespace qperf diff --git a/include/qperf_sub.hpp b/include/qperf_sub.hpp index 6a584d4..57ed0ab 100644 --- a/include/qperf_sub.hpp +++ b/include/qperf_sub.hpp @@ -13,7 +13,9 @@ namespace qperf { PerfSubscribeTrackHandler(const PerfConfig& perf_config, std::uint32_t test_identifier); public: - static auto Create(const std::string& section_name, ini::IniFile& inif, std::uint32_t test_identifier); + static std::shared_ptr Create(const std::string& section_name, + ini::IniFile& inif, + std::uint32_t test_identifier); void ObjectReceived(const quicr::ObjectHeaders&, quicr::BytesSpan) override; void StatusChanged(Status status) override; void MetricsSampled(const quicr::SubscribeTrackMetrics& metrics) override; @@ -56,25 +58,4 @@ namespace qperf { std::int64_t total_arrival_delta_; }; - class PerfSubClient : public quicr::Client - { - public: - PerfSubClient(const quicr::ClientConfig& cfg, const std::string& configfile, std::uint32_t test_identifier); - void StatusChanged(Status status) override; - void MetricsSampled(const quicr::ConnectionMetrics&) override {} - - bool HandlersComplete(); - void Terminate(); - - private: - bool terminate_; - std::string configfile_; - ini::IniFile inif_; - std::uint32_t test_identifier_; - - std::vector> track_handlers_; - - std::mutex track_handlers_mutex_; - }; - } // namespace diff --git a/scripts/run_parallel_subs.sh b/scripts/run_parallel_subs.sh index 8151dcc..eec8d27 100755 --- a/scripts/run_parallel_subs.sh +++ b/scripts/run_parallel_subs.sh @@ -3,13 +3,13 @@ LOGS_DIR=qperf_logs if [ -z "$1" ]; then - echo "Using default number of subscriber clients" - NUM_SUBS=${1:-100} + echo "Using default number of conferences" + CONFERENCES=${1:-100} elif [ "$1" -eq 0 ]; then - echo "Num subscribers must be greater than 0" + echo "Num conferences must be greater than 0" exit 1 else - NUM_SUBS=${1:-$1} # Arg 1 + CONFERENCES=${1:-$1} fi if [ -z "$2" ]; then @@ -25,7 +25,21 @@ else CONFIG_PATH="$3" fi -echo "Running $NUM_SUBS subscriber clients" +if [ -z "$4" ]; then + echo "Using default number of clients" + INSTANCES=${1:-100} +elif [ "$4" -eq 0 ]; then + echo "Num clients must be greater than 0" + exit 1 +else + INSTANCES=${4:-$4} +fi +echo "Running $CONFERENCES conferences with $INSTANCES clients each" + +rm -rf $LOGS_DIR mkdir -p $LOGS_DIR -parallel -j ${NUM_SUBS} "./qperf_sub -i {} -c $CONFIG_PATH --connect_uri $RELAY > $LOGS_DIR/t_{}logs.txt 2>&1" ::: $(seq ${NUM_SUBS}) + +for conference_id in $(seq 1 $CONFERENCES); do + parallel -j ${INSTANCES} "./qperf --conference_id $conference_id -i {} -n $INSTANCES -c $CONFIG_PATH --connect_uri $RELAY > $LOGS_DIR/t_$conference_id{}logs.txt 2>&1 &" ::: $(seq ${INSTANCES}) +done diff --git a/src/qperf.cpp b/src/qperf.cpp new file mode 100644 index 0000000..fc91933 --- /dev/null +++ b/src/qperf.cpp @@ -0,0 +1,239 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025 Cisco Systems +// SPDX-License-Identifier: BSD-2-Clause + +#include "qperf_pub.hpp" +#include "qperf_sub.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace qperf; + +class PerfClient : public quicr::Client +{ + public: + PerfClient(const quicr::ClientConfig& cfg, + const std::string& configfile, + std::uint32_t conference_id, + std::uint32_t instances, + std::uint32_t instance_identifier) + : quicr::Client(cfg) + , configfile_(configfile) + , conference_id_(conference_id) + , instance_id_(instance_identifier) + , instances_(instances) + { + } + + void StatusChanged(Status status) + { + switch (status) { + case Status::kReady: + SPDLOG_INFO("Client status - kReady"); + inif_.load(configfile_); + + for (const auto& [section_name, _] : inif_) { + auto pub_handler = pub_track_handlers_.emplace_back( + PerfPublishTrackHandler::Create(section_name, inif_, instance_id_ + (conference_id_ * 1000))); + PublishTrack(pub_handler); + } + + for (std::uint32_t i = 1; i <= instances_; ++i) { + if (i == instance_id_) { + continue; + } + + for (const auto& [section_name, _] : inif_) { + auto sub_handler = sub_track_handlers_.emplace_back( + PerfSubscribeTrackHandler::Create(section_name, inif_, i + (conference_id_ * 1000))); + SubscribeTrack(sub_handler); + } + } + + break; + case Status::kNotReady: + SPDLOG_INFO("Client status - kNotReady"); + break; + case Status::kConnecting: + SPDLOG_INFO("Client status - kConnecting"); + break; + case Status::kNotConnected: + SPDLOG_INFO("Client status - kNotConnected"); + break; + case Status::kPendingServerSetup: + SPDLOG_INFO("Client status - kPendingSeverSetup"); + break; + + case Status::kFailedToConnect: + SPDLOG_ERROR("Client status - kFailedToConnect"); + terminate_ = true; + break; + case Status::kInternalError: + SPDLOG_ERROR("Client status - kInternalError"); + terminate_ = true; + break; + case Status::kInvalidParams: + SPDLOG_ERROR("Client status - kInvalidParams"); + terminate_ = true; + break; + default: + SPDLOG_ERROR("Connection failed {0}", static_cast(status)); + terminate_ = true; + break; + } + } + + bool HandlersComplete() + { + std::lock_guard _(mutex_); + defer(std::this_thread::sleep_for(std::chrono::milliseconds(100))); + + if (sub_track_handlers_.empty() || pub_track_handlers_.empty()) { + return false; + } + + for (auto handler : pub_track_handlers_) { + if (!handler->IsComplete()) { + return false; + } + } + + for (auto handler : sub_track_handlers_) { + if (!handler->IsComplete()) { + return false; + } + } + + return true; + } + + void Terminate() + { + std::lock_guard _(mutex_); + + for (auto handler : sub_track_handlers_) { + SPDLOG_INFO("unsubscribe track {}", handler->TestName()); + UnsubscribeTrack(handler); + } + + for (auto handler : pub_track_handlers_) { + handler->StopWriter(); + UnpublishTrack(handler); + } + + terminate_ = true; + } + + private: + bool terminate_; + std::string configfile_; + ini::IniFile inif_; + std::uint32_t conference_id_; + std::uint32_t instance_id_; + std::uint32_t instances_; + + std::vector> sub_track_handlers_; + std::vector> pub_track_handlers_; + + std::mutex mutex_; +}; + +std::atomic_bool terminate = false; + +void +HandleTerminateSignal(int) +{ + terminate = true; +} + +int +main(int argc, char** argv) +{ + // clang-format off + cxxopts::Options options("QPerf"); + options.add_options() + ("endpoint_id", "Name of the client", cxxopts::value()->default_value("perf@cisco.com")) + ("connect_uri", "Relay to connect to", cxxopts::value()->default_value("moq://localhost:1234")) + ("n,instances", "Number of instances being run", cxxopts::value()->default_value("1")) + ("conference_id", "Conference identifier", cxxopts::value()->default_value("1")) + ("i,instance_id", "Instance identifier number", cxxopts::value()->default_value("1")) + ("c,config", "Scenario config file", cxxopts::value()) + ("h,help", "Print usage"); + // clang-format on + + cxxopts::ParseResult result; + + try { + result = options.parse(argc, argv); + } catch (const cxxopts::exceptions::exception& e) { + std::cerr << "Caught exception while parsing arguments: " << e.what() << std::endl; + return EXIT_FAILURE; + } + + if (result.count("help")) { + std::cerr << options.help() << std::endl; + return EXIT_SUCCESS; + } + + quicr::TransportConfig config; + config.tls_cert_filename = ""; + config.tls_key_filename = ""; + config.time_queue_max_duration = 5000; + config.use_reset_wait_strategy = false; + config.quic_qlog_path = ""; + + auto endpoint_instance_id = + result["endpoint_id"].as() + ":" + std::to_string(result["instance_id"].as()); + + quicr::ClientConfig client_config; + client_config.connect_uri = result["connect_uri"].as(); + client_config.endpoint_id = endpoint_instance_id; + client_config.metrics_sample_ms = 5000; + client_config.transport_config = config; + client_config.tick_service_sleep_delay_us = 50'000; + + auto log_id = endpoint_instance_id; + + const auto logger = spdlog::stderr_color_mt(log_id); + + const auto conference_id = result["conference_id"].as(); + const auto instance_id = result["instance_id"].as(); + const auto instances = result["instances"].as(); + + auto client = std::make_shared( + client_config, result["config"].as(), conference_id, instances, instance_id); + + std::signal(SIGINT, HandleTerminateSignal); + + try { + client->Connect(); + } catch (const std::exception& e) { + SPDLOG_LOGGER_CRITICAL( + logger, "Failed to connect to relay '{0}' with exception: {1}", client_config.connect_uri, e.what()); + return EXIT_FAILURE; + } catch (...) { + SPDLOG_LOGGER_CRITICAL(logger, "Unexpected error connecting to relay"); + return EXIT_FAILURE; + } + + while (!terminate && !client->HandlersComplete()) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + client->Terminate(); + client->Disconnect(); + + return EXIT_SUCCESS; +} diff --git a/src/qperf_pub.cc b/src/qperf_pub.cpp similarity index 68% rename from src/qperf_pub.cc rename to src/qperf_pub.cpp index de171d3..c76e5e9 100644 --- a/src/qperf_pub.cc +++ b/src/qperf_pub.cpp @@ -1,16 +1,16 @@ // SPDX-FileCopyrightText: Copyright (c) 2025 Cisco Systems // SPDX-License-Identifier: BSD-2-Clause +#include "qperf_pub.hpp" +#include "qperf.hpp" + #include #include #include #include #include -#include "qperf_pub.hpp" - #include -#include #include #include @@ -28,10 +28,12 @@ namespace qperf { memset(&test_metrics_, '\0', sizeof(test_metrics_)); } - auto PerfPublishTrackHandler::Create(const std::string& section_name, ini::IniFile& inif) + std::shared_ptr PerfPublishTrackHandler::Create(const std::string& section_name, + ini::IniFile& inif, + std::uint32_t instance_id) { PerfConfig perf_config; - PopulateScenarioFields(section_name, inif, perf_config); + PopulateScenarioFields(section_name, instance_id, inif, perf_config); return std::shared_ptr(new PerfPublishTrackHandler(perf_config)); } @@ -316,183 +318,4 @@ namespace qperf { write_thread_.join(); } } - - PerfPubClient::PerfPubClient(const quicr::ClientConfig& cfg, const std::string& configfile) - : quicr::Client(cfg) - , configfile_(configfile) - { - } - - void PerfPubClient::StatusChanged(Status status) - { - std::lock_guard _(track_handlers_mutex_); - switch (status) { - case Status::kReady: - SPDLOG_INFO("PerfPubClient - kReady"); - inif_.load(configfile_); - for (const auto& section_pair : inif_) { - const std::string& section_name = section_pair.first; - auto pub_handler = - track_handlers_.emplace_back(PerfPublishTrackHandler::Create(section_name, inif_)); - PublishTrack(pub_handler); - } - break; - - case Status::kNotReady: - SPDLOG_INFO("PerfPubClient - kNotReady"); - break; - case Status::kConnecting: - SPDLOG_INFO("PerfPubClient - kConnecting"); - break; - case Status::kDisconnecting: - SPDLOG_INFO("PerfPubClient - kDisconnecting"); - break; - case Status::kPendingServerSetup: - SPDLOG_INFO("PerfPubClient - kPendingSeverSetup"); - break; - - // All of the rest of these are 'errors' and will set terminate_. - case Status::kInternalError: - SPDLOG_INFO("PerfPubClient - kInternalError - terminate"); - terminate_ = true; - break; - case Status::kInvalidParams: - SPDLOG_INFO("PerfPubClient - kInvalidParams - terminate"); - terminate_ = true; - break; - case Status::kNotConnected: - SPDLOG_INFO("PerfPubClient - kNotConnected - terminate"); - terminate_ = true; - break; - case Status::kFailedToConnect: - SPDLOG_INFO("PerfPubClient - kFailedToConnect - terminate"); - terminate_ = true; - break; - default: - SPDLOG_INFO("PerfPubClient - UNKNOWN - Connection failed {0}", static_cast(status)); - terminate_ = true; - break; - } - } - - void PerfPubClient::MetricsSampled(const quicr::ConnectionMetrics&) {} - - bool PerfPubClient::GetTerminateStatus() - { - return terminate_; - } - - bool PerfPubClient::HandlersComplete() - { - std::lock_guard _(track_handlers_mutex_); - bool ret = true; - // Don't like this - should be dependent on a 'state' - if (track_handlers_.size() > 0) { - for (auto handler : track_handlers_) { - if (!handler->IsComplete()) { - ret = false; - break; - } - } - } else { - ret = false; - } - return ret; - } - - void PerfPubClient::Terminate() - { - std::lock_guard _(track_handlers_mutex_); - for (auto handler : track_handlers_) { - // Stop the handler writer thread... - handler->StopWriter(); - // Unpublish the track - UnpublishTrack(handler); - } - // we are done - terminate_ = true; - } } // namespace qperf - -bool terminate = false; - -void -HandleTerminateSignal(int) -{ - terminate = true; -} - -int -main(int argc, char** argv) -{ - // clang-format off - cxxopts::Options options("QPerf"); - options.add_options() - ("endpoint_id", "Name of the client", cxxopts::value()->default_value("perf@cisco.com")) - ("connect_uri", "Relay to connect to", cxxopts::value()->default_value("moq://localhost:1234")) - ("c,config", "Scenario config file", cxxopts::value()->default_value("./config.ini")) - ("h,help", "Print usage"); - // clang-format on - - cxxopts::ParseResult result; - - try { - result = options.parse(argc, argv); - } catch (const cxxopts::exceptions::exception& e) { - std::cerr << "Caught exception while parsing arguments: " << e.what() << std::endl; - return EXIT_FAILURE; - } - - if (result.count("help")) { - std::cerr << options.help() << std::endl; - return EXIT_SUCCESS; - } - - quicr::TransportConfig config; - config.tls_cert_filename = ""; - config.tls_key_filename = ""; - config.time_queue_max_duration = 5000; - config.use_reset_wait_strategy = false; - config.quic_qlog_path = ""; - - quicr::ClientConfig client_config; - client_config.endpoint_id = result["endpoint_id"].as(); - client_config.metrics_sample_ms = 5000; - client_config.transport_config = config; - client_config.connect_uri = result["connect_uri"].as(); - client_config.tick_service_sleep_delay_us = 50000; - - const auto logger = spdlog::stderr_color_mt("PERF"); - - auto config_file = result["config"].as(); - SPDLOG_INFO("--------------------------------------------"); - SPDLOG_INFO("Starting...pub"); - SPDLOG_INFO("\tconfig file {}", config_file); - SPDLOG_INFO("\tclient config:"); - SPDLOG_INFO("\t\tconnect_uri = {}", client_config.connect_uri); - SPDLOG_INFO("\t\tendpoint = {}", client_config.endpoint_id); - SPDLOG_INFO("--------------------------------------------"); - - std::signal(SIGINT, HandleTerminateSignal); - - auto client = std::make_shared(client_config, config_file); - - try { - client->Connect(); - } catch (const std::exception& e) { - SPDLOG_LOGGER_CRITICAL( - logger, "Failed to connect to relay '{0}' with exception: {1}", client_config.connect_uri, e.what()); - return EXIT_FAILURE; - } catch (...) { - SPDLOG_LOGGER_CRITICAL(logger, "Unexpected error connecting to relay"); - return EXIT_FAILURE; - } - - while (!terminate && !client->HandlersComplete()) { - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - - client->Terminate(); - client->Disconnect(); - return EXIT_SUCCESS; -} diff --git a/src/qperf_sub.cc b/src/qperf_sub.cpp similarity index 68% rename from src/qperf_sub.cc rename to src/qperf_sub.cpp index a9c917c..9d1a6d3 100644 --- a/src/qperf_sub.cc +++ b/src/qperf_sub.cpp @@ -1,6 +1,9 @@ // SPDX-FileCopyrightText: Copyright (c) 2025 Cisco Systems // SPDX-License-Identifier: BSD-2-Clause +#include "qperf_sub.hpp" +#include "qperf.hpp" + #include #include #include @@ -10,15 +13,12 @@ #include #include #include -#include #include #include #include #include #include -#include "qperf_sub.hpp" - namespace qperf { std::atomic_bool terminate = false; @@ -58,13 +58,13 @@ namespace qperf { { } - auto PerfSubscribeTrackHandler::Create(const std::string& section_name, - ini::IniFile& inif, - std::uint32_t test_identifier) + std::shared_ptr PerfSubscribeTrackHandler::Create(const std::string& section_name, + ini::IniFile& inif, + std::uint32_t instance_id) { PerfConfig perf_config; - PopulateScenarioFields(section_name, inif, perf_config); - return std::shared_ptr(new PerfSubscribeTrackHandler(perf_config, test_identifier)); + PopulateScenarioFields(section_name, instance_id, inif, perf_config); + return std::shared_ptr(new PerfSubscribeTrackHandler(perf_config, instance_id)); } void PerfSubscribeTrackHandler::StatusChanged(Status status) @@ -287,7 +287,7 @@ namespace qperf { if (test_mode_ == qperf::TestMode::kRunning) { std::uint64_t delta_bytes = metrics_.bytes_received - last_bytes_; - std::uint64_t bitrate = ((delta_bytes) * 8) / diff.count(); + std::uint64_t bitrate = ((delta_bytes) * 8) / std::max(diff.count(), std::int64_t(1)); metric_samples_ += 1; bitrate_total_ += bitrate; if (min_bitrate_ == 0) { @@ -313,179 +313,4 @@ namespace qperf { last_metric_time_ = std::chrono::time_point_cast(std::chrono::system_clock::now()); last_bytes_ = metrics.bytes_received; } - - /** - * @brief MoQ client - * @details Implementation of the MoQ Client - */ - - PerfSubClient::PerfSubClient(const quicr::ClientConfig& cfg, - const std::string& configfile, - std::uint32_t test_identifier) - : quicr::Client(cfg) - , configfile_(configfile) - , test_identifier_(test_identifier) - { - } - - void PerfSubClient::StatusChanged(Status status) - { - switch (status) { - case Status::kReady: - SPDLOG_INFO("Client status - kReady"); - inif_.load(configfile_); - for (const auto& section_pair : inif_) { - const std::string& section_name = section_pair.first; - SPDLOG_INFO("Starting test - {}", section_name); - auto sub_handler = track_handlers_.emplace_back( - PerfSubscribeTrackHandler::Create(section_name, inif_, test_identifier_)); - SubscribeTrack(sub_handler); - } - break; - case Status::kNotReady: - SPDLOG_INFO("Client status - kNotReady"); - break; - case Status::kConnecting: - SPDLOG_INFO("Client status - kConnecting"); - break; - case Status::kNotConnected: - SPDLOG_INFO("Client status - kNotConnected"); - break; - case Status::kPendingServerSetup: - SPDLOG_INFO("Client status - kPendingSeverSetup"); - break; - - case Status::kFailedToConnect: - SPDLOG_ERROR("Client status - kFailedToConnect"); - terminate_ = true; - break; - case Status::kInternalError: - SPDLOG_ERROR("Client status - kInternalError"); - terminate_ = true; - break; - case Status::kInvalidParams: - SPDLOG_ERROR("Client status - kInvalidParams"); - terminate_ = true; - break; - default: - SPDLOG_ERROR("Connection failed {0}", static_cast(status)); - terminate_ = true; - break; - } - } - - bool PerfSubClient::HandlersComplete() - { - std::lock_guard _(track_handlers_mutex_); - bool ret = true; - // Don't like this - should be dependent on a 'state' - if (track_handlers_.size() > 0) { - for (auto handler : track_handlers_) { - if (!handler->IsComplete()) { - ret = false; - } - } - } else { - ret = false; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - return ret; - } - - void PerfSubClient::Terminate() - { - std::lock_guard _(track_handlers_mutex_); - for (auto handler : track_handlers_) { - // Unpublish the track - SPDLOG_INFO("unsubscribe track {}", handler->TestName()); - UnsubscribeTrack(handler); - } - // we are done - terminate_ = true; - } - -} - -bool terminate = false; - -void -HandleTerminateSignal(int) -{ - terminate = true; -} - -int -main(int argc, char** argv) -{ - // clang-format off - cxxopts::Options options("QPerf"); - options.add_options() - ("endpoint_id", "Name of the client", cxxopts::value()->default_value("perf@cisco.com")) - ("connect_uri", "Relay to connect to", cxxopts::value()->default_value("moq://localhost:1234")) - ("i,test_id", "Test idenfiter number", cxxopts::value()->default_value("1")) - ("c,config", "Scenario config file", cxxopts::value()) - ("h,help", "Print usage"); - // clang-format on - - cxxopts::ParseResult result; - - try { - result = options.parse(argc, argv); - } catch (const cxxopts::exceptions::exception& e) { - std::cerr << "Caught exception while parsing arguments: " << e.what() << std::endl; - return EXIT_FAILURE; - } - - if (result.count("help")) { - std::cerr << options.help() << std::endl; - return EXIT_SUCCESS; - } - - quicr::TransportConfig config; - config.tls_cert_filename = ""; - config.tls_key_filename = ""; - config.time_queue_max_duration = 5000; - config.use_reset_wait_strategy = false; - config.quic_qlog_path = ""; - - auto endpoint_test_id = - result["endpoint_id"].as() + ":" + std::to_string(result["test_id"].as()); - - quicr::ClientConfig client_config; - client_config.connect_uri = result["connect_uri"].as(); - client_config.endpoint_id = endpoint_test_id; - client_config.metrics_sample_ms = 5000; - client_config.transport_config = config; - client_config.tick_service_sleep_delay_us = 50'000; - - auto log_id = endpoint_test_id; - - const auto logger = spdlog::stderr_color_mt(log_id); - - auto test_identifier = result["test_id"].as(); - - auto client = - std::make_shared(client_config, result["config"].as(), test_identifier); - - std::signal(SIGINT, HandleTerminateSignal); - - try { - client->Connect(); - } catch (const std::exception& e) { - SPDLOG_LOGGER_CRITICAL( - logger, "Failed to connect to relay '{0}' with exception: {1}", client_config.connect_uri, e.what()); - return EXIT_FAILURE; - } catch (...) { - SPDLOG_LOGGER_CRITICAL(logger, "Unexpected error connecting to relay"); - return EXIT_FAILURE; - } - - while (!terminate && !client->HandlersComplete()) { - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - - client->Terminate(); - client->Disconnect(); - - return EXIT_SUCCESS; } diff --git a/templates/config-audio.ini b/templates/config-audio.ini index cc67f21..8420106 100644 --- a/templates/config-audio.ini +++ b/templates/config-audio.ini @@ -1,6 +1,6 @@ [Audio] -namespace = perf/1 ; track namespace -name = 1 ; track name +namespace = perf/{} ; track namespace +name = audio ; track name track_mode = datagram ; track mode {datagram,stream} priority = 2 ; priority (0-255) ttl = 5000 ; ttl in ms From 3f13d6efbea0118104bc1de2109a0da43f2fc21c Mon Sep 17 00:00:00 2001 From: trigaux Date: Wed, 29 Oct 2025 13:37:38 -0400 Subject: [PATCH 2/6] Update and clean up. --- README.md | 52 +++++++++++++++++++++-------------- examples/config-audio.ini | 12 ++++++++ examples/config-av.ini | 25 +++++++++++++++++ include/qperf.hpp | 44 +++++++++++++++-------------- src/qperf.cpp | 6 ++-- src/qperf_pub.cpp | 5 ++-- src/qperf_sub.cpp | 1 - templates/config-audio.ini | 13 --------- templates/config-av.ini | 26 ------------------ templates/config.template.ini | 37 ++++++++----------------- 10 files changed, 108 insertions(+), 113 deletions(-) create mode 100644 examples/config-audio.ini create mode 100644 examples/config-av.ini delete mode 100644 templates/config-audio.ini delete mode 100644 templates/config-av.ini diff --git a/README.md b/README.md index e124060..0df0825 100644 --- a/README.md +++ b/README.md @@ -1,32 +1,35 @@ # qperf -Utility to evaluate relay performance. +Utility to evaluate relay performance. -## Command Line Programs - -### > src/qperf_pub - -**qperf_pub** is a publisher that publishes based on the `-c ` profile. - -### > src/qperf_sub - -**qperf_sub** is a subscriber that consumes the tracks based on the `-c ` profile. - -It is intended that the same `config.ini` file be used for both pub and sub. This way they have -the same settings. - -## config.ini +## Configuration Profile configuration per track is configured in a `config.ini` file that is provided to the program using -the `-c` option. See the [config.ini template](templates/config.template.ini) for an example configuration. +the `-c` option. See the [config.ini template](templates/config.template.ini) for an example configuration. Each section in the `config.ini` defines a test for a publish track and subscribe track. -The `namespace` and `name` together should be **unique** for the section, which is the track. - +The `namespace` and `name` together should be **unique** for the section, which is the track. + +Sections are laid out as follows: + +```ini +[TRACK] ; MUST be unique +namespace = ; MAY be the same across tracks, entries delimited by / +name = ; SHOULD be unique to other tracks +track_mode = ; (datagram|stream) +priority = ; (0-255) +ttl = ; TTL in ms +time_interval = ; transmit interval in floating point ms +objects_per_group = ; number of objects per group >=1 +first_object_size = ; size in bytes of the first object in a group +object_size = ; size in bytes of remaining objects in a group +start_delay = ; start delay in ms - after control messages are sent and acknowledged +total_transmit_time = ; total transmit time in ms +``` > [!IMPORTANT] -> Each section **MUST** not share the same `namespace + name`. If namespace is the same, the name -> should be different. +> Each section **MUST** not share the same `namespace + name` combination. If `namespace` is the same between sections, `name` +> **MUST** be different between sections. ## Building @@ -42,4 +45,11 @@ Build the programs using the following: cmake --build build -j 4 ``` -The binaries will be under `./build` \ No newline at end of file +The binaries will be under `./build` + +## Using + +The `qperf` program uses a config file to build tracks. It builds a conference +client by creating 1 publisher track for every Track section in the config file, +and N - 1 subscriber tracks for every Track section. The client does not subscribe +to its own publisher track. diff --git a/examples/config-audio.ini b/examples/config-audio.ini new file mode 100644 index 0000000..d093dd6 --- /dev/null +++ b/examples/config-audio.ini @@ -0,0 +1,12 @@ +[Audio] +namespace = perf/audio/{} ; MAY be the same across tracks, entries delimited by / +name = 1 ; SHOULD be unique to other tracks +track_mode = datagram ; (datagram|stream) +priority = 2 ; (0-255) +ttl = 5000 ; TTL in ms +time_interval = 20 ; transmit interval in floating point ms +objects_per_group = 1 ; number of objects per group >=1 +first_object_size = 120 ; size in bytes of the first object in a group +object_size = 120 ; size in bytes of remaining objects in a group +start_delay = 5000 ; start delay in ms - after control messages are sent and acknowledged +total_transmit_time = 35000 ; total transmit time in ms - including start delay diff --git a/examples/config-av.ini b/examples/config-av.ini new file mode 100644 index 0000000..f234c14 --- /dev/null +++ b/examples/config-av.ini @@ -0,0 +1,25 @@ +[Audio] +namespace = perf/audio/{} ; MAY be the same across tracks, entries delimited by / +name = 1 ; SHOULD be unique to other tracks +track_mode = datagram ; (datagram|stream) +priority = 2 ; (0-255) +ttl = 5000 ; TTL in ms +time_interval = 20 ; transmit interval in floating point ms +objects_per_group = 1 ; number of objects per group >=1 +first_object_size = 120 ; size in bytes of the first object in a group +object_size = 120 ; size in bytes of remaining objects in a group +start_delay = 5000 ; start delay in ms - after control messages are sent and acknowledged +total_transmit_time = 35000 ; total transmit time in ms + +[360p Video] +namespace = perf/video/{} ; MAY be the same across tracks, entries delimited by / +name = 1 ; SHOULD be unique to other tracks +track_mode = stream ; (datagram|stream) +priority = 3 ; (0-255) +ttl = 5000 ; TTL in ms +time_interval = 33.33 ; transmit interval in floating point ms +objects_per_group = 150 ; number of objects per group >=1 +first_object_size = 21333 ; size in bytes of the first object in a group +object_size = 2666 ; size in bytes of remaining objects in a group +start_delay = 5000 ; start delay in ms - after control messages are sent and acknowledged +total_transmit_time = 35000 ; total transmit time in ms diff --git a/include/qperf.hpp b/include/qperf.hpp index 700e56b..230ed48 100644 --- a/include/qperf.hpp +++ b/include/qperf.hpp @@ -1,9 +1,10 @@ #pragma once -#include +#include "inicpp.h" + #include -#include "inicpp.h" +#include namespace qperf { struct PerfConfig @@ -15,11 +16,11 @@ namespace qperf { uint32_t ttl; double transmit_interval; uint32_t objects_per_group; - uint32_t bytes_per_group_start; - uint32_t bytes_per_group; + uint32_t first_object_size; + uint32_t object_size; uint64_t start_delay; - uint64_t total_test_time; uint64_t total_transmit_time; + uint64_t total_test_time; }; enum class TestMode : uint8_t @@ -65,7 +66,7 @@ namespace qperf { return { quicr::TrackNamespace{ track_namespace }, { track_name.begin(), track_name.end() } }; } - static bool PopulateScenarioFields(const std::string section_name, + inline bool PopulateScenarioFields(const std::string section_name, std::uint32_t instance_id, ini::IniFile& inif, PerfConfig& perf_config) @@ -76,12 +77,13 @@ namespace qperf { perf_config.test_name = section_name; - scenario_namespace = - std::vformat(inif[section_name]["namespace"].as(), std::make_format_args(instance_id)); - scenario_name = inif[section_name]["name"].as(); + auto& section = inif[section_name]; + + scenario_namespace = std::vformat(section["namespace"].as(), std::make_format_args(instance_id)); + scenario_name = section["name"].as(); perf_config.full_track_name = MakeFullTrackName(scenario_namespace, scenario_name); - std::string track_mode_ini_str = inif[section_name]["track_mode"].as(); + std::string track_mode_ini_str = section["track_mode"].as(); if (track_mode_ini_str == "datagram") { perf_config.track_mode = quicr::TrackMode::kDatagram; } else if (track_mode_ini_str == "stream") { @@ -91,15 +93,15 @@ namespace qperf { SPDLOG_WARN("Invalid or missing track mode in scenario. Using default `stream`"); } - perf_config.priority = inif[section_name]["priority"].as(); - perf_config.ttl = inif[section_name]["ttl"].as(); - perf_config.transmit_interval = inif[section_name]["time_interval"].as(); - perf_config.objects_per_group = inif[section_name]["objs_per_group"].as(); - perf_config.bytes_per_group_start = inif[section_name]["bytes_per_group_start"].as(); - perf_config.bytes_per_group = inif[section_name]["bytes_per_group"].as(); - perf_config.start_delay = inif[section_name]["start_delay"].as(); - perf_config.total_test_time = inif[section_name]["total_test_time"].as(); - perf_config.total_transmit_time = perf_config.total_test_time - perf_config.start_delay; + perf_config.priority = section["priority"].as(); + perf_config.ttl = section["ttl"].as(); + perf_config.transmit_interval = section["time_interval"].as(); + perf_config.objects_per_group = section["objects_per_group"].as(); + perf_config.first_object_size = section["first_object_size"].as(); + perf_config.object_size = section["object_size"].as(); + perf_config.start_delay = section["start_delay"].as(); + perf_config.total_transmit_time = section["total_transmit_time"].as(); + perf_config.total_test_time = perf_config.total_transmit_time + perf_config.start_delay; SPDLOG_INFO("--------------------------------------------"); SPDLOG_INFO("Test config:"); @@ -109,8 +111,8 @@ namespace qperf { SPDLOG_INFO(" pri {}", perf_config.priority); SPDLOG_INFO(" ttl {}", perf_config.ttl); SPDLOG_INFO(" objspergroup {}", perf_config.objects_per_group); - SPDLOG_INFO(" bytes per group start {}", perf_config.bytes_per_group_start); - SPDLOG_INFO(" bytes per group {}", perf_config.bytes_per_group); + SPDLOG_INFO(" bytes per group start {}", perf_config.first_object_size); + SPDLOG_INFO(" bytes per group {}", perf_config.object_size); SPDLOG_INFO(" transmit interval {}", perf_config.transmit_interval); SPDLOG_INFO(" start_delay {}", perf_config.start_delay); SPDLOG_INFO(" total test time {}", perf_config.total_test_time); diff --git a/src/qperf.cpp b/src/qperf.cpp index fc91933..76c052f 100644 --- a/src/qperf.cpp +++ b/src/qperf.cpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include #include @@ -166,9 +166,9 @@ main(int argc, char** argv) options.add_options() ("endpoint_id", "Name of the client", cxxopts::value()->default_value("perf@cisco.com")) ("connect_uri", "Relay to connect to", cxxopts::value()->default_value("moq://localhost:1234")) - ("n,instances", "Number of instances being run", cxxopts::value()->default_value("1")) ("conference_id", "Conference identifier", cxxopts::value()->default_value("1")) - ("i,instance_id", "Instance identifier number", cxxopts::value()->default_value("1")) + ("n,instances", "Number of instances being run", cxxopts::value()) + ("i,instance_id", "Instance identifier number", cxxopts::value()) ("c,config", "Scenario config file", cxxopts::value()) ("h,help", "Print usage"); // clang-format on diff --git a/src/qperf_pub.cpp b/src/qperf_pub.cpp index c76e5e9..58589a4 100644 --- a/src/qperf_pub.cpp +++ b/src/qperf_pub.cpp @@ -6,7 +6,6 @@ #include #include -#include #include #include @@ -235,8 +234,8 @@ namespace qperf { void PerfPublishTrackHandler::WriteThread() { - quicr::Bytes object_0_buffer(perf_config_.bytes_per_group_start); - quicr::Bytes object_not_0_buffer(perf_config_.bytes_per_group); + quicr::Bytes object_0_buffer(perf_config_.first_object_size); + quicr::Bytes object_not_0_buffer(perf_config_.object_size); for (std::size_t i = 0; i < object_0_buffer.size(); i++) { object_0_buffer[i] = i % 255; diff --git a/src/qperf_sub.cpp b/src/qperf_sub.cpp index 9d1a6d3..6aa676c 100644 --- a/src/qperf_sub.cpp +++ b/src/qperf_sub.cpp @@ -6,7 +6,6 @@ #include #include -#include #include #include diff --git a/templates/config-audio.ini b/templates/config-audio.ini deleted file mode 100644 index 8420106..0000000 --- a/templates/config-audio.ini +++ /dev/null @@ -1,13 +0,0 @@ -[Audio] -namespace = perf/{} ; track namespace -name = audio ; track name -track_mode = datagram ; track mode {datagram,stream} -priority = 2 ; priority (0-255) -ttl = 5000 ; ttl in ms -time_interval = 20 ; transmit interval in floating point ms -objs_per_group = 1 ; objects per group count >=1 -bytes_per_group_start = 120 ; size of a group 0 object -bytes_per_group = 120 ; size of a group <> 0 object -start_delay = 5000 ; start delay in ms - after subscribes -total_test_time = 40000 ; total transmit time in ms - including startdelay -; (not configured): total_transmit_time - is calculated total_transmit_time = total_test_time - start_delay diff --git a/templates/config-av.ini b/templates/config-av.ini deleted file mode 100644 index 2a22f4c..0000000 --- a/templates/config-av.ini +++ /dev/null @@ -1,26 +0,0 @@ -[Audio] -namespace = perf/1 ; track namespace -name = 1 ; track name -track_mode = datagram ; track mode {datagram,stream} -priority = 2 ; priority (0-255) -ttl = 5000 ; ttl in ms -time_interval = 20 ; transmit interval in floating point ms -objs_per_group = 1 ; objects per group count >=1 -bytes_per_group_start = 120 ; size of a group 0 object -bytes_per_group = 120 ; size of a group <> 0 object -start_delay = 5000 ; start delay in ms - after subscribes -total_test_time = 40000 ; total transmit time in ms - including start delay -; (not configured): total_transmit_time - is calculated total_transmit_time = total_test_time - start_delay - -[360p Video] -namespace = perf/2 ; track namespace -name = 1 ; track name -track_mode = stream ; track mode {datagram,stream} -priority = 3 ; priority (0-255) -ttl = 5000 ; ttl in ms -time_interval = 33.33 ; transmit interval in floating point ms -objs_per_group = 150 ; objects per group count >=1 -bytes_per_group_start = 21333 ; size of a group 0 object -bytes_per_group = 2666 ; size of a group <> 0 object -start_delay = 5000 ; start delay in ms - after subscribes -total_test_time = 40000 ; total transmit time in ms - including start delay diff --git a/templates/config.template.ini b/templates/config.template.ini index 36415cb..804f81f 100644 --- a/templates/config.template.ini +++ b/templates/config.template.ini @@ -1,25 +1,12 @@ -[TRACK 1] -namespace = perf/1 ; track namespace - can be the same or different than other section tracks -name = 1 ; track name - Should be different than other section tracks -track_mode = datagram ; track mode {datagram,stream} -priority = 1 ; priority (0-255) -ttl = 5000 ; ttl in ms -time_interval = 20.00 ; transmit interval in floating point ms -objs_per_group = 1 ; objects per group count >=1 -bytes_per_group_start = 60 ; size of a group 0 object -bytes_per_group = 60 ; size of a group <> 0 object -start_delay = 10000 ; start delay in ms - after subscribes -total_test_time = 35000 ; total transmit time in ms - including start delay - -[TRACK 2] -namespace = perf/1 ; track namespace - can be the same or different than other section tracks -name = 2 ; track name - Should be different than other section tracks -track_mode = stream ; track mode {datagram,stream} -priority = 2 ; priority (0-255) -ttl = 5000 ; ttl in ms -time_interval = 33.33 ; transmit interval in floating point ms -objs_per_group = 150 ; objects per group count >=1 -bytes_per_group_start = 22000 ; size of a group 0 object -bytes_per_group = 2500 ; size of a group <> 0 object -start_delay = 10000 ; start delay in ms - after subscribes -total_test_time = 35000 ; total transmit time in ms - including start delay +[TRACK] +namespace = {} ; MAY be the same across tracks, entries delimited by / +name = {} ; SHOULD be unique to other tracks +track_mode = {} ; (datagram|stream) +priority = {} ; (0-255) +ttl = {} ; TTL in ms +time_interval = {} ; transmit interval in floating point ms +objects_per_group = {} ; number of objects per group >=1 +first_object_size = {} ; size in bytes of the first object in a group +object_size = {} ; size in bytes of remaining objects in a group +start_delay = {} ; start delay in ms - after control messages are sent and acknowledged +total_transmit_time = {} ; total transmit time in ms From 254164dc1e9f9947e821b8c87986fe96e30a93b3 Mon Sep 17 00:00:00 2001 From: trigaux Date: Wed, 29 Oct 2025 15:26:04 -0400 Subject: [PATCH 3/6] Fix format issue for gcc12 --- include/qperf.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/qperf.hpp b/include/qperf.hpp index 230ed48..36c9d04 100644 --- a/include/qperf.hpp +++ b/include/qperf.hpp @@ -79,7 +79,7 @@ namespace qperf { auto& section = inif[section_name]; - scenario_namespace = std::vformat(section["namespace"].as(), std::make_format_args(instance_id)); + scenario_namespace = fmt::vformat(section["namespace"].as(), fmt::make_format_args(instance_id)); scenario_name = section["name"].as(); perf_config.full_track_name = MakeFullTrackName(scenario_namespace, scenario_name); From 452dbf5889caf456e5a85abfc938228ebc180cac Mon Sep 17 00:00:00 2001 From: trigaux Date: Wed, 29 Oct 2025 15:30:47 -0400 Subject: [PATCH 4/6] Remove format header. --- src/qperf.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/qperf.cpp b/src/qperf.cpp index 76c052f..b566902 100644 --- a/src/qperf.cpp +++ b/src/qperf.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include From 826659163e859f2643a1337f320ca5edd94b813f Mon Sep 17 00:00:00 2001 From: trigaux Date: Tue, 18 Nov 2025 10:27:58 -0500 Subject: [PATCH 5/6] Clean up naming and put back older clients. --- CMakeLists.txt | 54 ++- ...rf_pub.hpp => publisher_track_handler.hpp} | 0 ...f_sub.hpp => subscriber_track_handler.hpp} | 0 src/publisher_track_handler.cpp | 320 +++++++++++++ src/{qperf.cpp => qperf_meeting.cpp} | 30 +- src/qperf_pub.cpp | 430 +++++++----------- src/qperf_sub.cpp | 393 ++++++---------- src/subscriber_track_handler.cpp | 315 +++++++++++++ 8 files changed, 995 insertions(+), 547 deletions(-) rename include/{qperf_pub.hpp => publisher_track_handler.hpp} (100%) rename include/{qperf_sub.hpp => subscriber_track_handler.hpp} (100%) create mode 100644 src/publisher_track_handler.cpp rename src/{qperf.cpp => qperf_meeting.cpp} (88%) create mode 100644 src/subscriber_track_handler.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 6b9ff51..fe6fe38 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,19 +23,61 @@ CPMAddPackage("gh:jarro2783/cxxopts@3.3.1") # Build QPerf executable #=============================================================================# -add_executable(qperf src/qperf.cpp src/qperf_pub.cpp src/qperf_sub.cpp) -target_link_libraries(qperf PRIVATE quicr cxxopts spdlog::spdlog) -target_include_directories(qperf PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include) +add_executable(qperf_meeting src/qperf_meeting.cpp src/publisher_track_handler.cpp src/subscriber_track_handler.cpp) +target_link_libraries(qperf_meeting PRIVATE quicr cxxopts spdlog::spdlog) +target_include_directories(qperf_meeting PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include) -target_compile_options(qperf PRIVATE +target_compile_options(qperf_meeting PRIVATE $<$,$,$>: -Wpedantic -Wextra -Wall> $<$: > ) -set_target_properties(qperf PROPERTIES +set_target_properties(qperf_meeting PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED YES CXX_EXTENSIONS OFF ) -target_compile_definitions(qperf PRIVATE SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_DEBUG) +target_compile_definitions(qperf_meeting PRIVATE SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_DEBUG) + +#=============================================================================# +# Build QPerf Publication executable +#=============================================================================# + +add_executable(qperf_pub src/qperf_pub.cpp src/publisher_track_handler.cpp) +target_link_libraries(qperf_pub PRIVATE quicr cxxopts spdlog::spdlog) +target_include_directories(qperf_pub PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include) + +target_compile_options(qperf_pub PRIVATE + $<$,$,$>: -Wpedantic -Wextra -Wall> + $<$: > +) + +set_target_properties(qperf_pub PROPERTIES + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED YES + CXX_EXTENSIONS OFF +) + +target_compile_definitions(qperf_pub PRIVATE SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_DEBUG) + +#=============================================================================# +# Build QPerf Subscription executable +#=============================================================================# + +add_executable(qperf_sub src/qperf_sub.cpp src/subscriber_track_handler.cpp) +target_link_libraries(qperf_sub PRIVATE quicr cxxopts spdlog::spdlog) +target_include_directories(qperf_sub PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include) + +target_compile_options(qperf_sub PRIVATE + $<$,$,$>: -Wpedantic -Wextra -Wall> + $<$: > +) + +set_target_properties(qperf_sub PROPERTIES + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED YES + CXX_EXTENSIONS OFF +) + +target_compile_definitions(qperf_sub PRIVATE SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_DEBUG) diff --git a/include/qperf_pub.hpp b/include/publisher_track_handler.hpp similarity index 100% rename from include/qperf_pub.hpp rename to include/publisher_track_handler.hpp diff --git a/include/qperf_sub.hpp b/include/subscriber_track_handler.hpp similarity index 100% rename from include/qperf_sub.hpp rename to include/subscriber_track_handler.hpp diff --git a/src/publisher_track_handler.cpp b/src/publisher_track_handler.cpp new file mode 100644 index 0000000..26a3a39 --- /dev/null +++ b/src/publisher_track_handler.cpp @@ -0,0 +1,320 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025 Cisco Systems +// SPDX-License-Identifier: BSD-2-Clause + +#include "publisher_track_handler.hpp" +#include "qperf.hpp" + +#include +#include +#include +#include + +#include +#include +#include + +namespace qperf { + PerfPublishTrackHandler::PerfPublishTrackHandler(const PerfConfig& perf_config) + : PublishTrackHandler(perf_config.full_track_name, perf_config.track_mode, perf_config.priority, perf_config.ttl) + , perf_config_(perf_config) + , terminate_(false) + , last_bytes_(0) + , test_mode_(qperf::TestMode::kNone) + , group_id_(0) + , object_id_(0) + + { + memset(&test_metrics_, '\0', sizeof(test_metrics_)); + } + + std::shared_ptr PerfPublishTrackHandler::Create(const std::string& section_name, + ini::IniFile& inif, + std::uint32_t instance_id) + { + PerfConfig perf_config; + PopulateScenarioFields(section_name, instance_id, inif, perf_config); + return std::shared_ptr(new PerfPublishTrackHandler(perf_config)); + } + + void PerfPublishTrackHandler::StatusChanged(Status status) + { + switch (status) { + case Status::kOk: { + SPDLOG_INFO("PerfPublishTrackeHandler - status kOk"); + auto track_alias = GetTrackAlias().value(); + SPDLOG_INFO("Track alias: {0} is ready to write", track_alias); + write_thread_ = SpawnWriter(); + } break; + case Status::kNotConnected: + SPDLOG_INFO("PerfPublishTrackeHandler - status kNotConnected"); + break; + case Status::kNotAnnounced: + SPDLOG_INFO("PerfPublishTrackeHandler - status kNotAnnounced"); + break; + case Status::kPendingAnnounceResponse: + SPDLOG_INFO("PerfPublishTrackeHandler - status kPendingAnnounceResponse"); + break; + case Status::kAnnounceNotAuthorized: + SPDLOG_INFO("PerfPublishTrackeHandler - status kAnnounceNotAuthorized"); + break; + case Status::kNoSubscribers: + SPDLOG_INFO("PerfPublishTrackeHandler - status kNoSubscribers"); + break; + case Status::kSendingUnannounce: + SPDLOG_INFO("PerfPublishTrackeHandler - status kSendingUnannounce"); + break; + case Status::kPaused: + SPDLOG_INFO("PerfPublishTrackeHandler - status kPaused"); + break; + case Status::kNewGroupRequested: + SPDLOG_INFO("PerfPublishTrackeHandler - status kNewGroupRequested"); + break; + case Status::kSubscriptionUpdated: + SPDLOG_INFO("PerfPublishTrackeHandler - status kSubscriptionUpdated"); + break; + default: + SPDLOG_INFO("PerfPublishTrackeHandler - status UNKNOWN"); + break; + } + } + + void PerfPublishTrackHandler::MetricsSampled(const quicr::PublishTrackMetrics& metrics) + { + std::lock_guard _(mutex_); + auto now = std::chrono::time_point_cast(std::chrono::system_clock::now()); + if (test_mode_ == qperf::TestMode::kRunning && last_bytes_ != 0) { // skip first metric reporting... + // calculate bitrate metrics + auto diff = std::chrono::duration_cast(now - last_metric_time_); + std::uint64_t delta_bytes = metrics.bytes_published - last_bytes_; + std::uint64_t bitrate = ((delta_bytes) * 8) / diff.count(); + test_metrics_.bitrate_total += bitrate; + test_metrics_.max_publish_bitrate = + bitrate > test_metrics_.max_publish_bitrate ? bitrate : test_metrics_.max_publish_bitrate; + test_metrics_.min_publish_bitrate = + bitrate < test_metrics_.min_publish_bitrate ? bitrate : test_metrics_.min_publish_bitrate; + test_metrics_.metric_samples += 1; + test_metrics_.avg_publish_bitrate = test_metrics_.bitrate_total / test_metrics_.metric_samples; + SPDLOG_INFO("{}: Bitrate: {} {} delta bytes {}, delta time {}, {}, {}, {}", + perf_config_.test_name, + bitrate, + FormatBitrate(bitrate), + delta_bytes, + diff.count(), + test_metrics_.min_publish_bitrate, + test_metrics_.max_publish_bitrate, + test_metrics_.avg_publish_bitrate); + } + + last_metric_time_ = now; + last_bytes_ = metrics.bytes_published; + } + + std::chrono::time_point PerfPublishTrackHandler::PublishObjectWithMetrics( + quicr::BytesSpan object_span) + { + std::lock_guard _(mutex_); + ObjectTestHeader test_header; + memset(&test_header, '\0', sizeof(test_header)); + if (perf_config_.objects_per_group > 0) { + if (!(object_id_ % perf_config_.objects_per_group)) { + object_id_ = 0; + group_id_ += 1; + } + } else { + SPDLOG_WARN("{} Error - objects per groups <= 0", perf_config_.test_name); + } + + quicr::ObjectHeaders object_headers; + object_headers.group_id = group_id_; + object_headers.object_id = object_id_; + object_headers.payload_length = 0; // set later + object_headers.priority = perf_config_.priority; + object_headers.ttl = perf_config_.ttl; + + // get current time.. + auto now = std::chrono::system_clock::now(); + auto duration = now.time_since_epoch(); + + // update metrics + if (test_metrics_.start_transmit_time == 0) { + test_metrics_.start_transmit_time = std::chrono::duration_cast(duration).count(); + } + + // fill out test_header + test_header.test_mode = qperf::TestMode::kRunning; + test_header.time = std::chrono::duration_cast(duration).count(); + + // check how much we can write in the header + auto header_bytes_to_copy = + object_span.size() < sizeof(test_header) ? sizeof(test_header.test_mode) : sizeof(test_header); + memcpy((void*)object_span.data(), (void*)&test_header, header_bytes_to_copy); + object_headers.payload_length = object_span.size(); + + // publish + PublishObject(object_headers, object_span); + + SPDLOG_TRACE("PO, RUNNING, {}, {}, {}, {}, {}", + perf_config_.test_name, + group_id_, + object_id_, + publish_track_metrics_.objects_published, + publish_track_metrics_.bytes_published); + + // return current time in ms - publish time + return now; + } + + std::uint64_t PerfPublishTrackHandler::PublishTestComplete() + { + std::lock_guard _(mutex_); + test_mode_ = qperf::TestMode::kComplete; + auto now = std::chrono::system_clock::now(); + auto duration = now.time_since_epoch(); + + ObjectTestComplete test_complete; + memset(&test_complete, '\0', sizeof(test_complete)); + + // start_transmit_time is set when fist object is published + test_metrics_.end_transmit_time = std::chrono::duration_cast(duration).count(); + + // test_metrics_.end_transmit_time; + test_metrics_.total_published_objects = publish_track_metrics_.objects_published + 1; + test_metrics_.total_published_bytes = publish_track_metrics_.bytes_published + sizeof(test_complete); + test_metrics_.total_objects_dropped_not_ok = publish_track_metrics_.objects_dropped_not_ok; + + test_complete.test_mode = test_mode_; + test_complete.time = test_metrics_.end_transmit_time; + memcpy(&test_complete.test_metrics, &test_metrics_, sizeof(test_metrics_)); + + quicr::Bytes object_data(sizeof(test_complete)); + memcpy((void*)&object_data[0], (void*)&test_complete, sizeof(test_complete)); + + object_id_ += 1; + + quicr::ObjectHeaders object_headers; + object_headers.group_id = group_id_; + object_headers.object_id = object_id_; + object_headers.payload_length = 0; + object_headers.priority = perf_config_.priority; + object_headers.ttl = perf_config_.ttl; + + object_headers.payload_length = sizeof(test_complete); + PublishObject(object_headers, object_data); + + auto total_transmit_time = test_metrics_.end_transmit_time - test_metrics_.start_transmit_time; + SPDLOG_INFO("PO, COMPLETE, {}, {}, {}, {}, {}, {}", + perf_config_.test_name, + group_id_, + object_id_, + test_metrics_.total_published_objects, + test_metrics_.total_published_bytes, + total_transmit_time); + SPDLOG_INFO("--------------------------------------------"); + SPDLOG_INFO("{}", perf_config_.test_name); + SPDLOG_INFO("Publish Object - Complete"); + SPDLOG_INFO(" Total transmit time (ms) {}", total_transmit_time); + SPDLOG_INFO(" Total pubished objects {}, bytes {}", + test_metrics_.total_published_objects, + test_metrics_.total_published_bytes); + SPDLOG_INFO(" Bitrate (bps)"); + SPDLOG_INFO(" min {}", test_metrics_.min_publish_bitrate); + SPDLOG_INFO(" max {}", test_metrics_.max_publish_bitrate); + SPDLOG_INFO(" avg {}", test_metrics_.avg_publish_bitrate); + SPDLOG_INFO(" {}", + FormatBitrate(static_cast(test_metrics_.avg_publish_bitrate))); + SPDLOG_INFO("--------------------------------------------"); + + return test_complete.time; + } + + std::thread PerfPublishTrackHandler::SpawnWriter() + { + return std::thread([this] { WriteThread(); }); + } + + void PerfPublishTrackHandler::WriteThread() + { + quicr::Bytes object_0_buffer(perf_config_.first_object_size); + quicr::Bytes object_not_0_buffer(perf_config_.object_size); + + for (std::size_t i = 0; i < object_0_buffer.size(); i++) { + object_0_buffer[i] = i % 255; + } + + for (std::size_t i = 0; i < object_not_0_buffer.size(); i++) { + object_not_0_buffer[i] = i % 255; + } + + group_id_ = 0; + object_id_ = 0; + + if (perf_config_.total_test_time <= 0) { + SPDLOG_WARN("Transmit time <= 0 - stopping test"); + return; + } + + const std::chrono::time_point start_transmit_time = std::chrono::system_clock::now(); + auto transmit_time_ms = std::chrono::milliseconds(perf_config_.total_test_time); + auto end_transmit_time = start_transmit_time + transmit_time_ms; + + // Delay before transmitting + if (perf_config_.start_delay > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(33)); + test_mode_ = qperf::TestMode::kWaitPreTest; + SPDLOG_INFO("{} Waiting start delay {} ms", perf_config_.test_name, perf_config_.start_delay); + const std::chrono::time_point start = std::chrono::system_clock::now(); + auto delay_ms = std::chrono::milliseconds(perf_config_.start_delay); + auto end_time = start + delay_ms; + while (!terminate_) { + auto now = std::chrono::time_point_cast(std::chrono::system_clock::now()); + if (now >= end_time) { + break; + } + std::this_thread::sleep_for(std::chrono::microseconds(500)); + } + } + + // Transmit + SPDLOG_INFO("{} Start transmitting for {} ms", perf_config_.test_name, perf_config_.total_transmit_time); + + test_mode_ = qperf::TestMode::kRunning; + while (!terminate_) { + std::chrono::time_point last_publish_time; + if (object_id_ == 0) { + quicr::BytesSpan object_span(object_0_buffer); + last_publish_time = PublishObjectWithMetrics(object_span); + } else { + quicr::BytesSpan object_span(object_not_0_buffer); + last_publish_time = PublishObjectWithMetrics(object_span); + } + + // Check if we are done... + if (last_publish_time >= end_transmit_time) { + // publish COMPLETE object - end of test + std::this_thread::sleep_for(std::chrono::milliseconds(33)); + PublishTestComplete(); + std::this_thread::sleep_for(std::chrono::milliseconds(perf_config_.start_delay / 2)); + terminate_ = true; + return; + } + + // Wait interval + if (perf_config_.transmit_interval >= 0) { + std::uint64_t interval_us = (perf_config_.transmit_interval * 1000.0f); + std::this_thread::sleep_for(std::chrono::microseconds(interval_us)); + } else { + SPDLOG_WARN("{} Transmit interval is < 0", perf_config_.test_name); + } + object_id_ += 1; + }; + SPDLOG_WARN("{} Exiting writer thread.", perf_config_.test_name); + } + + void PerfPublishTrackHandler::StopWriter() + { + terminate_ = true; + if (write_thread_.joinable()) { + write_thread_.join(); + } + } +} // namespace qperf diff --git a/src/qperf.cpp b/src/qperf_meeting.cpp similarity index 88% rename from src/qperf.cpp rename to src/qperf_meeting.cpp index b566902..abe1bb6 100644 --- a/src/qperf.cpp +++ b/src/qperf_meeting.cpp @@ -1,8 +1,8 @@ // SPDX-FileCopyrightText: Copyright (c) 2025 Cisco Systems // SPDX-License-Identifier: BSD-2-Clause -#include "qperf_pub.hpp" -#include "qperf_sub.hpp" +#include "publisher_track_handler.hpp" +#include "subscriber_track_handler.hpp" #include #include @@ -25,12 +25,12 @@ class PerfClient : public quicr::Client public: PerfClient(const quicr::ClientConfig& cfg, const std::string& configfile, - std::uint32_t conference_id, + std::uint32_t meeting_id, std::uint32_t instances, std::uint32_t instance_identifier) : quicr::Client(cfg) , configfile_(configfile) - , conference_id_(conference_id) + , meeting_id_(meeting_id) , instance_id_(instance_identifier) , instances_(instances) { @@ -45,7 +45,7 @@ class PerfClient : public quicr::Client for (const auto& [section_name, _] : inif_) { auto pub_handler = pub_track_handlers_.emplace_back( - PerfPublishTrackHandler::Create(section_name, inif_, instance_id_ + (conference_id_ * 1000))); + PerfPublishTrackHandler::Create(section_name, inif_, instance_id_ + (meeting_id_ * 1000))); PublishTrack(pub_handler); } @@ -56,7 +56,7 @@ class PerfClient : public quicr::Client for (const auto& [section_name, _] : inif_) { auto sub_handler = sub_track_handlers_.emplace_back( - PerfSubscribeTrackHandler::Create(section_name, inif_, i + (conference_id_ * 1000))); + PerfSubscribeTrackHandler::Create(section_name, inif_, i + (meeting_id_ * 1000))); SubscribeTrack(sub_handler); } } @@ -139,7 +139,7 @@ class PerfClient : public quicr::Client bool terminate_; std::string configfile_; ini::IniFile inif_; - std::uint32_t conference_id_; + std::uint32_t meeting_id_; std::uint32_t instance_id_; std::uint32_t instances_; @@ -163,12 +163,12 @@ main(int argc, char** argv) // clang-format off cxxopts::Options options("QPerf"); options.add_options() - ("endpoint_id", "Name of the client", cxxopts::value()->default_value("perf@cisco.com")) - ("connect_uri", "Relay to connect to", cxxopts::value()->default_value("moq://localhost:1234")) - ("conference_id", "Conference identifier", cxxopts::value()->default_value("1")) - ("n,instances", "Number of instances being run", cxxopts::value()) - ("i,instance_id", "Instance identifier number", cxxopts::value()) - ("c,config", "Scenario config file", cxxopts::value()) + ("endpoint_id", "Name of the client", cxxopts::value()->default_value("perf@cisco.com")) + ("connect_uri", "Relay to connect to", cxxopts::value()->default_value("moq://localhost:1234")) + ("meeting_id", "Meeting identifier", cxxopts::value()->default_value("1")) + ("n,instances", "Number of instances being run", cxxopts::value()) + ("i,instance_id", "Instance identifier number", cxxopts::value()) + ("c,config", "Scenario config file", cxxopts::value()) ("h,help", "Print usage"); // clang-format on @@ -207,12 +207,12 @@ main(int argc, char** argv) const auto logger = spdlog::stderr_color_mt(log_id); - const auto conference_id = result["conference_id"].as(); + const auto meeting_id = result["meeting_id"].as(); const auto instance_id = result["instance_id"].as(); const auto instances = result["instances"].as(); auto client = std::make_shared( - client_config, result["config"].as(), conference_id, instances, instance_id); + client_config, result["config"].as(), meeting_id, instances, instance_id); std::signal(SIGINT, HandleTerminateSignal); diff --git a/src/qperf_pub.cpp b/src/qperf_pub.cpp index 58589a4..6245004 100644 --- a/src/qperf_pub.cpp +++ b/src/qperf_pub.cpp @@ -1,320 +1,206 @@ // SPDX-FileCopyrightText: Copyright (c) 2025 Cisco Systems // SPDX-License-Identifier: BSD-2-Clause -#include "qperf_pub.hpp" +#include "publisher_track_handler.hpp" #include "qperf.hpp" #include #include +#include #include #include +#include #include -#include +#include +#include +#include #include - -namespace qperf { - PerfPublishTrackHandler::PerfPublishTrackHandler(const PerfConfig& perf_config) - : PublishTrackHandler(perf_config.full_track_name, perf_config.track_mode, perf_config.priority, perf_config.ttl) - , perf_config_(perf_config) - , terminate_(false) - , last_bytes_(0) - , test_mode_(qperf::TestMode::kNone) - , group_id_(0) - , object_id_(0) - - { - memset(&test_metrics_, '\0', sizeof(test_metrics_)); - } - - std::shared_ptr PerfPublishTrackHandler::Create(const std::string& section_name, - ini::IniFile& inif, - std::uint32_t instance_id) +#include + +class PerfPubClient : public quicr::Client +{ + public: + PerfPubClient(const quicr::ClientConfig& cfg, const std::string& configfile) + : quicr::Client(cfg) + , configfile_(configfile) { - PerfConfig perf_config; - PopulateScenarioFields(section_name, instance_id, inif, perf_config); - return std::shared_ptr(new PerfPublishTrackHandler(perf_config)); } - void PerfPublishTrackHandler::StatusChanged(Status status) + void StatusChanged(Status status) override { + std::lock_guard _(track_handlers_mutex_); switch (status) { - case Status::kOk: { - SPDLOG_INFO("PerfPublishTrackeHandler - status kOk"); - auto track_alias = GetTrackAlias().value(); - SPDLOG_INFO("Track alias: {0} is ready to write", track_alias); - write_thread_ = SpawnWriter(); - } break; - case Status::kNotConnected: - SPDLOG_INFO("PerfPublishTrackeHandler - status kNotConnected"); + case Status::kReady: + SPDLOG_INFO("PerfPubClient - kReady"); + inif_.load(configfile_); + for (const auto& section_pair : inif_) { + const std::string& section_name = section_pair.first; + auto pub_handler = + track_handlers_.emplace_back(qperf::PerfPublishTrackHandler::Create(section_name, inif_, 0)); + PublishTrack(pub_handler); + } break; - case Status::kNotAnnounced: - SPDLOG_INFO("PerfPublishTrackeHandler - status kNotAnnounced"); + + case Status::kNotReady: + SPDLOG_INFO("PerfPubClient - kNotReady"); break; - case Status::kPendingAnnounceResponse: - SPDLOG_INFO("PerfPublishTrackeHandler - status kPendingAnnounceResponse"); + case Status::kConnecting: + SPDLOG_INFO("PerfPubClient - kConnecting"); break; - case Status::kAnnounceNotAuthorized: - SPDLOG_INFO("PerfPublishTrackeHandler - status kAnnounceNotAuthorized"); + case Status::kDisconnecting: + SPDLOG_INFO("PerfPubClient - kDisconnecting"); break; - case Status::kNoSubscribers: - SPDLOG_INFO("PerfPublishTrackeHandler - status kNoSubscribers"); + case Status::kPendingServerSetup: + SPDLOG_INFO("PerfPubClient - kPendingSeverSetup"); break; - case Status::kSendingUnannounce: - SPDLOG_INFO("PerfPublishTrackeHandler - status kSendingUnannounce"); + + // All of the rest of these are 'errors' and will set terminate_. + case Status::kInternalError: + SPDLOG_INFO("PerfPubClient - kInternalError - terminate"); + terminate_ = true; break; - case Status::kPaused: - SPDLOG_INFO("PerfPublishTrackeHandler - status kPaused"); + case Status::kInvalidParams: + SPDLOG_INFO("PerfPubClient - kInvalidParams - terminate"); + terminate_ = true; break; - case Status::kNewGroupRequested: - SPDLOG_INFO("PerfPublishTrackeHandler - status kNewGroupRequested"); + case Status::kNotConnected: + SPDLOG_INFO("PerfPubClient - kNotConnected - terminate"); + terminate_ = true; break; - case Status::kSubscriptionUpdated: - SPDLOG_INFO("PerfPublishTrackeHandler - status kSubscriptionUpdated"); + case Status::kFailedToConnect: + SPDLOG_INFO("PerfPubClient - kFailedToConnect - terminate"); + terminate_ = true; break; default: - SPDLOG_INFO("PerfPublishTrackeHandler - status UNKNOWN"); + SPDLOG_INFO("PerfPubClient - UNKNOWN - Connection failed {0}", static_cast(status)); + terminate_ = true; break; } } - void PerfPublishTrackHandler::MetricsSampled(const quicr::PublishTrackMetrics& metrics) - { - std::lock_guard _(mutex_); - auto now = std::chrono::time_point_cast(std::chrono::system_clock::now()); - if (test_mode_ == qperf::TestMode::kRunning && last_bytes_ != 0) { // skip first metric reporting... - // calculate bitrate metrics - auto diff = std::chrono::duration_cast(now - last_metric_time_); - std::uint64_t delta_bytes = metrics.bytes_published - last_bytes_; - std::uint64_t bitrate = ((delta_bytes) * 8) / diff.count(); - test_metrics_.bitrate_total += bitrate; - test_metrics_.max_publish_bitrate = - bitrate > test_metrics_.max_publish_bitrate ? bitrate : test_metrics_.max_publish_bitrate; - test_metrics_.min_publish_bitrate = - bitrate < test_metrics_.min_publish_bitrate ? bitrate : test_metrics_.min_publish_bitrate; - test_metrics_.metric_samples += 1; - test_metrics_.avg_publish_bitrate = test_metrics_.bitrate_total / test_metrics_.metric_samples; - SPDLOG_INFO("{}: Bitrate: {} {} delta bytes {}, delta time {}, {}, {}, {}", - perf_config_.test_name, - bitrate, - FormatBitrate(bitrate), - delta_bytes, - diff.count(), - test_metrics_.min_publish_bitrate, - test_metrics_.max_publish_bitrate, - test_metrics_.avg_publish_bitrate); - } + void MetricsSampled(const quicr::ConnectionMetrics&) override {} - last_metric_time_ = now; - last_bytes_ = metrics.bytes_published; - } + bool GetTerminateStatus() { return terminate_; } - std::chrono::time_point PerfPublishTrackHandler::PublishObjectWithMetrics( - quicr::BytesSpan object_span) + bool HandlersComplete() { - std::lock_guard _(mutex_); - ObjectTestHeader test_header; - memset(&test_header, '\0', sizeof(test_header)); - if (perf_config_.objects_per_group > 0) { - if (!(object_id_ % perf_config_.objects_per_group)) { - object_id_ = 0; - group_id_ += 1; + std::lock_guard _(track_handlers_mutex_); + bool ret = true; + // Don't like this - should be dependent on a 'state' + if (track_handlers_.size() > 0) { + for (auto handler : track_handlers_) { + if (!handler->IsComplete()) { + ret = false; + break; + } } } else { - SPDLOG_WARN("{} Error - objects per groups <= 0", perf_config_.test_name); + ret = false; } - - quicr::ObjectHeaders object_headers; - object_headers.group_id = group_id_; - object_headers.object_id = object_id_; - object_headers.payload_length = 0; // set later - object_headers.priority = perf_config_.priority; - object_headers.ttl = perf_config_.ttl; - - // get current time.. - auto now = std::chrono::system_clock::now(); - auto duration = now.time_since_epoch(); - - // update metrics - if (test_metrics_.start_transmit_time == 0) { - test_metrics_.start_transmit_time = std::chrono::duration_cast(duration).count(); - } - - // fill out test_header - test_header.test_mode = qperf::TestMode::kRunning; - test_header.time = std::chrono::duration_cast(duration).count(); - - // check how much we can write in the header - auto header_bytes_to_copy = - object_span.size() < sizeof(test_header) ? sizeof(test_header.test_mode) : sizeof(test_header); - memcpy((void*)object_span.data(), (void*)&test_header, header_bytes_to_copy); - object_headers.payload_length = object_span.size(); - - // publish - PublishObject(object_headers, object_span); - - SPDLOG_TRACE("PO, RUNNING, {}, {}, {}, {}, {}", - perf_config_.test_name, - group_id_, - object_id_, - publish_track_metrics_.objects_published, - publish_track_metrics_.bytes_published); - - // return current time in ms - publish time - return now; + return ret; } - std::uint64_t PerfPublishTrackHandler::PublishTestComplete() + void Terminate() { - std::lock_guard _(mutex_); - test_mode_ = qperf::TestMode::kComplete; - auto now = std::chrono::system_clock::now(); - auto duration = now.time_since_epoch(); - - ObjectTestComplete test_complete; - memset(&test_complete, '\0', sizeof(test_complete)); - - // start_transmit_time is set when fist object is published - test_metrics_.end_transmit_time = std::chrono::duration_cast(duration).count(); - - // test_metrics_.end_transmit_time; - test_metrics_.total_published_objects = publish_track_metrics_.objects_published + 1; - test_metrics_.total_published_bytes = publish_track_metrics_.bytes_published + sizeof(test_complete); - test_metrics_.total_objects_dropped_not_ok = publish_track_metrics_.objects_dropped_not_ok; - - test_complete.test_mode = test_mode_; - test_complete.time = test_metrics_.end_transmit_time; - memcpy(&test_complete.test_metrics, &test_metrics_, sizeof(test_metrics_)); - - quicr::Bytes object_data(sizeof(test_complete)); - memcpy((void*)&object_data[0], (void*)&test_complete, sizeof(test_complete)); - - object_id_ += 1; - - quicr::ObjectHeaders object_headers; - object_headers.group_id = group_id_; - object_headers.object_id = object_id_; - object_headers.payload_length = 0; - object_headers.priority = perf_config_.priority; - object_headers.ttl = perf_config_.ttl; - - object_headers.payload_length = sizeof(test_complete); - PublishObject(object_headers, object_data); - - auto total_transmit_time = test_metrics_.end_transmit_time - test_metrics_.start_transmit_time; - SPDLOG_INFO("PO, COMPLETE, {}, {}, {}, {}, {}, {}", - perf_config_.test_name, - group_id_, - object_id_, - test_metrics_.total_published_objects, - test_metrics_.total_published_bytes, - total_transmit_time); - SPDLOG_INFO("--------------------------------------------"); - SPDLOG_INFO("{}", perf_config_.test_name); - SPDLOG_INFO("Publish Object - Complete"); - SPDLOG_INFO(" Total transmit time (ms) {}", total_transmit_time); - SPDLOG_INFO(" Total pubished objects {}, bytes {}", - test_metrics_.total_published_objects, - test_metrics_.total_published_bytes); - SPDLOG_INFO(" Bitrate (bps)"); - SPDLOG_INFO(" min {}", test_metrics_.min_publish_bitrate); - SPDLOG_INFO(" max {}", test_metrics_.max_publish_bitrate); - SPDLOG_INFO(" avg {}", test_metrics_.avg_publish_bitrate); - SPDLOG_INFO(" {}", - FormatBitrate(static_cast(test_metrics_.avg_publish_bitrate))); - SPDLOG_INFO("--------------------------------------------"); - - return test_complete.time; + std::lock_guard _(track_handlers_mutex_); + for (auto handler : track_handlers_) { + // Stop the handler writer thread... + handler->StopWriter(); + // Unpublish the track + UnpublishTrack(handler); + } + // we are done + terminate_ = true; } - std::thread PerfPublishTrackHandler::SpawnWriter() - { - return std::thread([this] { WriteThread(); }); + private: + bool terminate_; + std::string configfile_; + ini::IniFile inif_; + std::vector> track_handlers_; + std::mutex track_handlers_mutex_; +}; + +bool terminate = false; + +void +HandleTerminateSignal(int) +{ + terminate = true; +} + +int +main(int argc, char** argv) +{ + // clang-format off + cxxopts::Options options("QPerf"); + options.add_options() + ("endpoint_id", "Name of the client", cxxopts::value()->default_value("perf@cisco.com")) + ("connect_uri", "Relay to connect to", cxxopts::value()->default_value("moq://localhost:1234")) + ("c,config", "Scenario config file", cxxopts::value()->default_value("./config.ini")) + ("h,help", "Print usage"); + // clang-format on + + cxxopts::ParseResult result; + + try { + result = options.parse(argc, argv); + } catch (const cxxopts::exceptions::exception& e) { + std::cerr << "Caught exception while parsing arguments: " << e.what() << std::endl; + return EXIT_FAILURE; } - void PerfPublishTrackHandler::WriteThread() - { - quicr::Bytes object_0_buffer(perf_config_.first_object_size); - quicr::Bytes object_not_0_buffer(perf_config_.object_size); - - for (std::size_t i = 0; i < object_0_buffer.size(); i++) { - object_0_buffer[i] = i % 255; - } - - for (std::size_t i = 0; i < object_not_0_buffer.size(); i++) { - object_not_0_buffer[i] = i % 255; - } - - group_id_ = 0; - object_id_ = 0; - - if (perf_config_.total_test_time <= 0) { - SPDLOG_WARN("Transmit time <= 0 - stopping test"); - return; - } - - const std::chrono::time_point start_transmit_time = std::chrono::system_clock::now(); - auto transmit_time_ms = std::chrono::milliseconds(perf_config_.total_test_time); - auto end_transmit_time = start_transmit_time + transmit_time_ms; - - // Delay before transmitting - if (perf_config_.start_delay > 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(33)); - test_mode_ = qperf::TestMode::kWaitPreTest; - SPDLOG_INFO("{} Waiting start delay {} ms", perf_config_.test_name, perf_config_.start_delay); - const std::chrono::time_point start = std::chrono::system_clock::now(); - auto delay_ms = std::chrono::milliseconds(perf_config_.start_delay); - auto end_time = start + delay_ms; - while (!terminate_) { - auto now = std::chrono::time_point_cast(std::chrono::system_clock::now()); - if (now >= end_time) { - break; - } - std::this_thread::sleep_for(std::chrono::microseconds(500)); - } - } - - // Transmit - SPDLOG_INFO("{} Start transmitting for {} ms", perf_config_.test_name, perf_config_.total_transmit_time); - - test_mode_ = qperf::TestMode::kRunning; - while (!terminate_) { - std::chrono::time_point last_publish_time; - if (object_id_ == 0) { - quicr::BytesSpan object_span(object_0_buffer); - last_publish_time = PublishObjectWithMetrics(object_span); - } else { - quicr::BytesSpan object_span(object_not_0_buffer); - last_publish_time = PublishObjectWithMetrics(object_span); - } - - // Check if we are done... - if (last_publish_time >= end_transmit_time) { - // publish COMPLETE object - end of test - std::this_thread::sleep_for(std::chrono::milliseconds(33)); - PublishTestComplete(); - std::this_thread::sleep_for(std::chrono::milliseconds(perf_config_.start_delay / 2)); - terminate_ = true; - return; - } + if (result.count("help")) { + std::cerr << options.help() << std::endl; + return EXIT_SUCCESS; + } - // Wait interval - if (perf_config_.transmit_interval >= 0) { - std::uint64_t interval_us = (perf_config_.transmit_interval * 1000.0f); - std::this_thread::sleep_for(std::chrono::microseconds(interval_us)); - } else { - SPDLOG_WARN("{} Transmit interval is < 0", perf_config_.test_name); - } - object_id_ += 1; - }; - SPDLOG_WARN("{} Exiting writer thread.", perf_config_.test_name); + quicr::TransportConfig config; + config.tls_cert_filename = ""; + config.tls_key_filename = ""; + config.time_queue_max_duration = 5000; + config.use_reset_wait_strategy = false; + config.quic_qlog_path = ""; + + quicr::ClientConfig client_config; + client_config.endpoint_id = result["endpoint_id"].as(); + client_config.metrics_sample_ms = 5000; + client_config.transport_config = config; + client_config.connect_uri = result["connect_uri"].as(); + client_config.tick_service_sleep_delay_us = 50000; + + const auto logger = spdlog::stderr_color_mt("PERF"); + + auto config_file = result["config"].as(); + SPDLOG_INFO("--------------------------------------------"); + SPDLOG_INFO("Starting...pub"); + SPDLOG_INFO("\tconfig file {}", config_file); + SPDLOG_INFO("\tclient config:"); + SPDLOG_INFO("\t\tconnect_uri = {}", client_config.connect_uri); + SPDLOG_INFO("\t\tendpoint = {}", client_config.endpoint_id); + SPDLOG_INFO("--------------------------------------------"); + + std::signal(SIGINT, HandleTerminateSignal); + + auto client = std::make_shared(client_config, config_file); + + try { + client->Connect(); + } catch (const std::exception& e) { + SPDLOG_LOGGER_CRITICAL( + logger, "Failed to connect to relay '{0}' with exception: {1}", client_config.connect_uri, e.what()); + return EXIT_FAILURE; + } catch (...) { + SPDLOG_LOGGER_CRITICAL(logger, "Unexpected error connecting to relay"); + return EXIT_FAILURE; } - void PerfPublishTrackHandler::StopWriter() - { - terminate_ = true; - if (write_thread_.joinable()) { - write_thread_.join(); - } + while (!terminate && !client->HandlersComplete()) { + std::this_thread::sleep_for(std::chrono::seconds(1)); } -} // namespace qperf + + client->Terminate(); + client->Disconnect(); + return EXIT_SUCCESS; +} diff --git a/src/qperf_sub.cpp b/src/qperf_sub.cpp index 6aa676c..8391698 100644 --- a/src/qperf_sub.cpp +++ b/src/qperf_sub.cpp @@ -1,315 +1,200 @@ // SPDX-FileCopyrightText: Copyright (c) 2025 Cisco Systems // SPDX-License-Identifier: BSD-2-Clause -#include "qperf_sub.hpp" -#include "qperf.hpp" +#include "subscriber_track_handler.hpp" #include #include +#include #include #include #include #include -#include -#include -#include -#include +#include +#include #include #include - -namespace qperf { - - std::atomic_bool terminate = false; - - /** - * @brief Subscribe track handler - * @details Subscribe track handler used for the subscribe command line option. - */ - PerfSubscribeTrackHandler::PerfSubscribeTrackHandler(const PerfConfig& perf_config, std::uint32_t test_identifier) - : SubscribeTrackHandler(perf_config.full_track_name, - perf_config.priority, - quicr::messages::GroupOrder::kOriginalPublisherOrder, - quicr::messages::FilterType::kLargestObject) - , terminate_(false) - , perf_config_(perf_config) - , first_pass_(true) - , last_bytes_(0) - , local_now_(0) - , last_local_now_(0) - , total_objects_(0) - , total_bytes_(0) +#include + +class PerfSubClient : public quicr::Client +{ + public: + PerfSubClient(const quicr::ClientConfig& cfg, const std::string& configfile, std::uint32_t test_identifier) + : quicr::Client(cfg) + , configfile_(configfile) , test_identifier_(test_identifier) - , test_mode_(qperf::TestMode::kNone) - , max_bitrate_(0) - , min_bitrate_(0) - , avg_bitrate_(0.0) - , metric_samples_(0) - , bitrate_total_(0) - , max_object_time_delta_(0) - , min_object_time_delta_(std::numeric_limits::max()) - , avg_object_time_delta_(0.0) - , total_time_delta_(0) - , max_object_arrival_delta_(0) - , min_object_arrival_delta_(std::numeric_limits::max()) - , avg_object_arrival_delta_(0.0) - , total_arrival_delta_(0) - { - } - - std::shared_ptr PerfSubscribeTrackHandler::Create(const std::string& section_name, - ini::IniFile& inif, - std::uint32_t instance_id) { - PerfConfig perf_config; - PopulateScenarioFields(section_name, instance_id, inif, perf_config); - return std::shared_ptr(new PerfSubscribeTrackHandler(perf_config, instance_id)); } - void PerfSubscribeTrackHandler::StatusChanged(Status status) + void StatusChanged(Status status) override { switch (status) { - case Status::kOk: { - auto track_alias = GetTrackAlias(); - if (track_alias.has_value()) { - SPDLOG_INFO( - "{}, {}, {} Ready to read", test_identifier_, perf_config_.test_name, track_alias.value()); + case Status::kReady: + SPDLOG_INFO("Client status - kReady"); + inif_.load(configfile_); + for (const auto& section_pair : inif_) { + const std::string& section_name = section_pair.first; + SPDLOG_INFO("Starting test - {}", section_name); + auto sub_handler = track_handlers_.emplace_back( + qperf::PerfSubscribeTrackHandler::Create(section_name, inif_, test_identifier_)); + SubscribeTrack(sub_handler); } break; - } - case Status::kNotConnected: - SPDLOG_INFO("{}, {} Subscribe Handler - kNotConnected", test_identifier_, perf_config_.test_name); + case Status::kNotReady: + SPDLOG_INFO("Client status - kNotReady"); break; - case Status::kNotSubscribed: - SPDLOG_INFO("{}, {} Subscribe Handler - kNotSubscribed", test_identifier_, perf_config_.test_name); + case Status::kConnecting: + SPDLOG_INFO("Client status - kConnecting"); + break; + case Status::kNotConnected: + SPDLOG_INFO("Client status - kNotConnected"); break; - case Status::kPendingResponse: - SPDLOG_INFO( - "{}, {} Subscribe Handler - kPendingSubscribeResponse", test_identifier_, perf_config_.test_name); + case Status::kPendingServerSetup: + SPDLOG_INFO("Client status - kPendingSeverSetup"); break; - // rest of these terminate - case Status::kSendingUnsubscribe: - SPDLOG_INFO("{}, {} Subscribe Handler - kSendingUnsubscribe", test_identifier_, perf_config_.test_name); + case Status::kFailedToConnect: + SPDLOG_ERROR("Client status - kFailedToConnect"); terminate_ = true; break; - case Status::kError: - SPDLOG_INFO("{}, {} Subscribe Handler - kSubscribeError", test_identifier_, perf_config_.test_name); + case Status::kInternalError: + SPDLOG_ERROR("Client status - kInternalError"); terminate_ = true; break; - case Status::kNotAuthorized: - SPDLOG_INFO("{}, {} Subscribe Handler - kNotAuthorized", test_identifier_, perf_config_.test_name); + case Status::kInvalidParams: + SPDLOG_ERROR("Client status - kInvalidParams"); terminate_ = true; break; default: - SPDLOG_INFO("{}, {} Subscribe Handler - UNKNOWN", test_identifier_, perf_config_.test_name); - // leave... + SPDLOG_ERROR("Connection failed {0}", static_cast(status)); terminate_ = true; break; } } - void PerfSubscribeTrackHandler::ObjectReceived(const quicr::ObjectHeaders& object_header, - quicr::BytesSpan data_span) - { - auto received_time = std::chrono::system_clock::now(); - local_now_ = std::chrono::time_point_cast(received_time).time_since_epoch().count(); - - total_objects_ += 1; - total_bytes_ += data_span.size(); - - if (first_pass_) { + void MetricsSampled(const quicr::ConnectionMetrics&) override {} - last_local_now_ = local_now_; - start_data_time_ = local_now_; + bool HandlersComplete() + { + std::lock_guard _(track_handlers_mutex_); + bool ret = true; + // Don't like this - should be dependent on a 'state' + if (track_handlers_.size() > 0) { + for (auto handler : track_handlers_) { + if (!handler->IsComplete()) { + ret = false; + } + } + } else { + ret = false; } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + return ret; + } - memcpy(&test_mode_, data_span.data(), sizeof(std::uint8_t)); - - if (test_mode_ == qperf::TestMode::kRunning) { + void Terminate() + { + std::lock_guard _(track_handlers_mutex_); + for (auto handler : track_handlers_) { + // Unpublish the track + SPDLOG_INFO("unsubscribe track {}", handler->TestName()); + UnsubscribeTrack(handler); + } + // we are done + terminate_ = true; + } - qperf::ObjectTestHeader test_header; - memset(&test_header, '\0', sizeof(test_header)); - memcpy(&test_header, - &data_span[0], - data_span.size() < sizeof(test_header) ? sizeof(test_header.test_mode) : sizeof(test_header)); + private: + bool terminate_; + std::string configfile_; + ini::IniFile inif_; + std::uint32_t test_identifier_; - auto remote_now = test_header.time; - std::int64_t transmit_delta = local_now_ - remote_now; - std::int64_t arrival_delta = local_now_ - last_local_now_; + std::vector> track_handlers_; - if (transmit_delta <= 0) { - SPDLOG_INFO("-- negative/zero transmit delta (check ntp) -- {} {} {} {} {}", - object_header.group_id, - object_header.object_id, - local_now_, - remote_now, - transmit_delta); - } + std::mutex track_handlers_mutex_; +}; - if (arrival_delta <= 0) { - SPDLOG_INFO("-- negative/zero arrival delta -- {} {} {} {} {}", - object_header.group_id, - object_header.object_id, - local_now_, - last_local_now_, - arrival_delta); - } +bool terminate = false; - if (first_pass_) { - SPDLOG_INFO("--------------------------------------------"); - SPDLOG_INFO("{}", perf_config_.test_name); - SPDLOG_INFO("Started Receiving"); - SPDLOG_INFO("\tTest time {} ms", perf_config_.total_transmit_time); - SPDLOG_INFO("--------------------------------------------"); - } +void +HandleTerminateSignal(int) +{ + terminate = true; +} - SPDLOG_TRACE("OR, RUNNING, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}", - test_identifier_, - perf_config_.test_name, - object_header.group_id, - object_header.object_id, - data_span.size(), - local_now_, - remote_now, - transmit_delta, - arrival_delta, - total_objects_, - total_bytes_); +int +main(int argc, char** argv) +{ + // clang-format off + cxxopts::Options options("QPerf"); + options.add_options() + ("endpoint_id", "Name of the client", cxxopts::value()->default_value("perf@cisco.com")) + ("connect_uri", "Relay to connect to", cxxopts::value()->default_value("moq://localhost:1234")) + ("i,test_id", "Test idenfiter number", cxxopts::value()->default_value("1")) + ("c,config", "Scenario config file", cxxopts::value()) + ("h,help", "Print usage"); + // clang-format on + + cxxopts::ParseResult result; + + try { + result = options.parse(argc, argv); + } catch (const cxxopts::exceptions::exception& e) { + std::cerr << "Caught exception while parsing arguments: " << e.what() << std::endl; + return EXIT_FAILURE; + } - if (!first_pass_) { + if (result.count("help")) { + std::cerr << options.help() << std::endl; + return EXIT_SUCCESS; + } - total_time_delta_ += transmit_delta; - max_object_time_delta_ = transmit_delta > (std::int64_t)max_object_time_delta_ - ? transmit_delta - : (std::int64_t)max_object_time_delta_; - min_object_time_delta_ = transmit_delta < (std::int64_t)min_object_time_delta_ - ? transmit_delta - : (std::int64_t)min_object_time_delta_; + quicr::TransportConfig config; + config.tls_cert_filename = ""; + config.tls_key_filename = ""; + config.time_queue_max_duration = 5000; + config.use_reset_wait_strategy = false; + config.quic_qlog_path = ""; - total_arrival_delta_ += arrival_delta; - max_object_arrival_delta_ = arrival_delta > (std::int64_t)max_object_arrival_delta_ - ? arrival_delta - : (std::int64_t)max_object_arrival_delta_; - min_object_arrival_delta_ = arrival_delta < (std::int64_t)min_object_arrival_delta_ - ? arrival_delta - : (std::int64_t)min_object_arrival_delta_; - } + auto endpoint_test_id = + result["endpoint_id"].as() + ":" + std::to_string(result["test_id"].as()); - } else if (test_mode_ == qperf::TestMode::kComplete) { + quicr::ClientConfig client_config; + client_config.connect_uri = result["connect_uri"].as(); + client_config.endpoint_id = endpoint_test_id; + client_config.metrics_sample_ms = 5000; + client_config.transport_config = config; + client_config.tick_service_sleep_delay_us = 50'000; - ObjectTestComplete test_complete; + auto log_id = endpoint_test_id; - memset(&test_complete, '\0', sizeof(test_complete)); - memcpy(&test_complete, data_span.data(), sizeof(test_complete)); + const auto logger = spdlog::stderr_color_mt(log_id); - std::int64_t total_time = local_now_ - start_data_time_; - avg_object_time_delta_ = (double)total_time_delta_ / (double)total_objects_; - avg_object_arrival_delta_ = - (double)total_arrival_delta_ / (double)total_objects_ - 1; // subtract 1st object + auto test_identifier = result["test_id"].as(); - SPDLOG_INFO("--------------------------------------------"); - SPDLOG_INFO("{}", perf_config_.test_name); - SPDLOG_INFO("Testing Complete"); - SPDLOG_INFO(" Total test run time (ms) {}", total_time / 1000.0f); - SPDLOG_INFO(" Configured test time (ms) {}", perf_config_.total_transmit_time); - SPDLOG_INFO(" Total subscribed objects {}, bytes {}", total_objects_, total_bytes_); - SPDLOG_INFO(" Total published objects {}, bytes {}", - test_complete.test_metrics.total_published_objects, - test_complete.test_metrics.total_published_bytes); - SPDLOG_INFO(" Subscribed delta objects {}, bytes {}", - test_complete.test_metrics.total_published_objects - total_objects_, - test_complete.test_metrics.total_published_bytes - total_bytes_); - SPDLOG_INFO(" Bitrate (bps):"); - SPDLOG_INFO(" min {}", min_bitrate_); - SPDLOG_INFO(" max {}", max_bitrate_); - SPDLOG_INFO(" avg {:.3f}", avg_bitrate_); - SPDLOG_INFO(" {}", FormatBitrate(static_cast(avg_bitrate_))); - SPDLOG_INFO(" Object time delta (us):"); - SPDLOG_INFO(" min {}", min_object_time_delta_); - SPDLOG_INFO(" max {}", max_object_time_delta_); - SPDLOG_INFO(" avg {:04.3f} ", avg_object_time_delta_); - SPDLOG_INFO(" Object arrival delta (us):"); - SPDLOG_INFO(" min {}", min_object_arrival_delta_); - SPDLOG_INFO(" max {}", max_object_arrival_delta_); - SPDLOG_INFO(" avg {:04.3f}", avg_object_arrival_delta_); - SPDLOG_INFO(" over_multiplier {}", - static_cast(avg_object_arrival_delta_ / (perf_config_.transmit_interval * 10000))); - SPDLOG_INFO("--------------------------------------------"); + auto client = std::make_shared(client_config, result["config"].as(), test_identifier); - // id,test_name,total_time,total_transmit_time,total_objects,total_bytes,sent_object,sent_bytes,min_bitrate, - // max_bitrate,avg_bitrate,min_time,maxtime,avg_time,min_arrival,max_arrival,avg_arrival, - // delta_objects,arrival_over_multiplier - SPDLOG_INFO("OR COMPLETE, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}", - test_identifier_, - perf_config_.test_name, - total_time, - perf_config_.total_transmit_time, - total_objects_, - total_bytes_, - test_complete.test_metrics.total_published_objects, - test_complete.test_metrics.total_published_bytes, - min_bitrate_, - max_bitrate_, - avg_bitrate_, - min_object_time_delta_, - max_object_time_delta_, - avg_object_time_delta_, - min_object_arrival_delta_, - max_object_arrival_delta_, - avg_object_arrival_delta_, - test_complete.test_metrics.total_published_objects - total_objects_, - static_cast(avg_object_arrival_delta_ / (perf_config_.transmit_interval * 10000))); - terminate_ = true; - return; - } else { - SPDLOG_WARN( - "OR, {}, {} - unknown data identifier {}", test_identifier_, perf_config_.test_name, (int)test_mode_); - } + std::signal(SIGINT, HandleTerminateSignal); - last_local_now_ = local_now_; - first_pass_ = false; + try { + client->Connect(); + } catch (const std::exception& e) { + SPDLOG_LOGGER_CRITICAL( + logger, "Failed to connect to relay '{0}' with exception: {1}", client_config.connect_uri, e.what()); + return EXIT_FAILURE; + } catch (...) { + SPDLOG_LOGGER_CRITICAL(logger, "Unexpected error connecting to relay"); + return EXIT_FAILURE; } - void PerfSubscribeTrackHandler::MetricsSampled(const quicr::SubscribeTrackMetrics& metrics) - { - metrics_ = metrics; - if (last_bytes_ == 0) { - last_metric_time_ = - std::chrono::time_point_cast(std::chrono::system_clock::now()); - last_bytes_ = metrics.bytes_received; - return; - } - - auto now = std::chrono::time_point_cast(std::chrono::system_clock::now()); - auto diff = std::chrono::duration_cast(now - last_metric_time_); + while (!terminate && !client->HandlersComplete()) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } - if (test_mode_ == qperf::TestMode::kRunning) { - std::uint64_t delta_bytes = metrics_.bytes_received - last_bytes_; - std::uint64_t bitrate = ((delta_bytes) * 8) / std::max(diff.count(), std::int64_t(1)); - metric_samples_ += 1; - bitrate_total_ += bitrate; - if (min_bitrate_ == 0) { - min_bitrate_ = bitrate; - } - max_bitrate_ = bitrate > max_bitrate_ ? bitrate : max_bitrate_; - min_bitrate_ = bitrate < min_bitrate_ ? bitrate : min_bitrate_; - avg_bitrate_ = (double)bitrate_total_ / (double)metric_samples_; - SPDLOG_INFO("Metrics:, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}", - test_identifier_, - perf_config_.test_name, - bitrate, - FormatBitrate(bitrate), - delta_bytes, - diff.count(), - metrics_.objects_received, - metrics_.bytes_received, - max_bitrate_, - min_bitrate_, - avg_bitrate_); - } + client->Terminate(); + client->Disconnect(); - last_metric_time_ = std::chrono::time_point_cast(std::chrono::system_clock::now()); - last_bytes_ = metrics.bytes_received; - } + return EXIT_SUCCESS; } diff --git a/src/subscriber_track_handler.cpp b/src/subscriber_track_handler.cpp new file mode 100644 index 0000000..ba45bcc --- /dev/null +++ b/src/subscriber_track_handler.cpp @@ -0,0 +1,315 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025 Cisco Systems +// SPDX-License-Identifier: BSD-2-Clause + +#include "subscriber_track_handler.hpp" +#include "qperf.hpp" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace qperf { + + std::atomic_bool terminate = false; + + /** + * @brief Subscribe track handler + * @details Subscribe track handler used for the subscribe command line option. + */ + PerfSubscribeTrackHandler::PerfSubscribeTrackHandler(const PerfConfig& perf_config, std::uint32_t test_identifier) + : SubscribeTrackHandler(perf_config.full_track_name, + perf_config.priority, + quicr::messages::GroupOrder::kOriginalPublisherOrder, + quicr::messages::FilterType::kLargestObject) + , terminate_(false) + , perf_config_(perf_config) + , first_pass_(true) + , last_bytes_(0) + , local_now_(0) + , last_local_now_(0) + , total_objects_(0) + , total_bytes_(0) + , test_identifier_(test_identifier) + , test_mode_(qperf::TestMode::kNone) + , max_bitrate_(0) + , min_bitrate_(0) + , avg_bitrate_(0.0) + , metric_samples_(0) + , bitrate_total_(0) + , max_object_time_delta_(0) + , min_object_time_delta_(std::numeric_limits::max()) + , avg_object_time_delta_(0.0) + , total_time_delta_(0) + , max_object_arrival_delta_(0) + , min_object_arrival_delta_(std::numeric_limits::max()) + , avg_object_arrival_delta_(0.0) + , total_arrival_delta_(0) + { + } + + std::shared_ptr PerfSubscribeTrackHandler::Create(const std::string& section_name, + ini::IniFile& inif, + std::uint32_t instance_id) + { + PerfConfig perf_config; + PopulateScenarioFields(section_name, instance_id, inif, perf_config); + return std::shared_ptr(new PerfSubscribeTrackHandler(perf_config, instance_id)); + } + + void PerfSubscribeTrackHandler::StatusChanged(Status status) + { + switch (status) { + case Status::kOk: { + auto track_alias = GetTrackAlias(); + if (track_alias.has_value()) { + SPDLOG_INFO( + "{}, {}, {} Ready to read", test_identifier_, perf_config_.test_name, track_alias.value()); + } + break; + } + case Status::kNotConnected: + SPDLOG_INFO("{}, {} Subscribe Handler - kNotConnected", test_identifier_, perf_config_.test_name); + break; + case Status::kNotSubscribed: + SPDLOG_INFO("{}, {} Subscribe Handler - kNotSubscribed", test_identifier_, perf_config_.test_name); + break; + case Status::kPendingResponse: + SPDLOG_INFO( + "{}, {} Subscribe Handler - kPendingSubscribeResponse", test_identifier_, perf_config_.test_name); + break; + + // rest of these terminate + case Status::kSendingUnsubscribe: + SPDLOG_INFO("{}, {} Subscribe Handler - kSendingUnsubscribe", test_identifier_, perf_config_.test_name); + terminate_ = true; + break; + case Status::kError: + SPDLOG_INFO("{}, {} Subscribe Handler - kSubscribeError", test_identifier_, perf_config_.test_name); + terminate_ = true; + break; + case Status::kNotAuthorized: + SPDLOG_INFO("{}, {} Subscribe Handler - kNotAuthorized", test_identifier_, perf_config_.test_name); + terminate_ = true; + break; + default: + SPDLOG_INFO("{}, {} Subscribe Handler - UNKNOWN", test_identifier_, perf_config_.test_name); + // leave... + terminate_ = true; + break; + } + } + + void PerfSubscribeTrackHandler::ObjectReceived(const quicr::ObjectHeaders& object_header, + quicr::BytesSpan data_span) + { + auto received_time = std::chrono::system_clock::now(); + local_now_ = std::chrono::time_point_cast(received_time).time_since_epoch().count(); + + total_objects_ += 1; + total_bytes_ += data_span.size(); + + if (first_pass_) { + + last_local_now_ = local_now_; + start_data_time_ = local_now_; + } + + memcpy(&test_mode_, data_span.data(), sizeof(std::uint8_t)); + + if (test_mode_ == qperf::TestMode::kRunning) { + + qperf::ObjectTestHeader test_header; + memset(&test_header, '\0', sizeof(test_header)); + memcpy(&test_header, + &data_span[0], + data_span.size() < sizeof(test_header) ? sizeof(test_header.test_mode) : sizeof(test_header)); + + auto remote_now = test_header.time; + std::int64_t transmit_delta = local_now_ - remote_now; + std::int64_t arrival_delta = local_now_ - last_local_now_; + + if (transmit_delta <= 0) { + SPDLOG_INFO("-- negative/zero transmit delta (check ntp) -- {} {} {} {} {}", + object_header.group_id, + object_header.object_id, + local_now_, + remote_now, + transmit_delta); + } + + if (arrival_delta <= 0) { + SPDLOG_INFO("-- negative/zero arrival delta -- {} {} {} {} {}", + object_header.group_id, + object_header.object_id, + local_now_, + last_local_now_, + arrival_delta); + } + + if (first_pass_) { + SPDLOG_INFO("--------------------------------------------"); + SPDLOG_INFO("{}", perf_config_.test_name); + SPDLOG_INFO("Started Receiving"); + SPDLOG_INFO("\tTest time {} ms", perf_config_.total_transmit_time); + SPDLOG_INFO("--------------------------------------------"); + } + + SPDLOG_TRACE("OR, RUNNING, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}", + test_identifier_, + perf_config_.test_name, + object_header.group_id, + object_header.object_id, + data_span.size(), + local_now_, + remote_now, + transmit_delta, + arrival_delta, + total_objects_, + total_bytes_); + + if (!first_pass_) { + + total_time_delta_ += transmit_delta; + max_object_time_delta_ = transmit_delta > (std::int64_t)max_object_time_delta_ + ? transmit_delta + : (std::int64_t)max_object_time_delta_; + min_object_time_delta_ = transmit_delta < (std::int64_t)min_object_time_delta_ + ? transmit_delta + : (std::int64_t)min_object_time_delta_; + + total_arrival_delta_ += arrival_delta; + max_object_arrival_delta_ = arrival_delta > (std::int64_t)max_object_arrival_delta_ + ? arrival_delta + : (std::int64_t)max_object_arrival_delta_; + min_object_arrival_delta_ = arrival_delta < (std::int64_t)min_object_arrival_delta_ + ? arrival_delta + : (std::int64_t)min_object_arrival_delta_; + } + + } else if (test_mode_ == qperf::TestMode::kComplete) { + + ObjectTestComplete test_complete; + + memset(&test_complete, '\0', sizeof(test_complete)); + memcpy(&test_complete, data_span.data(), sizeof(test_complete)); + + std::int64_t total_time = local_now_ - start_data_time_; + avg_object_time_delta_ = (double)total_time_delta_ / (double)total_objects_; + avg_object_arrival_delta_ = + (double)total_arrival_delta_ / (double)total_objects_ - 1; // subtract 1st object + + SPDLOG_INFO("--------------------------------------------"); + SPDLOG_INFO("{}", perf_config_.test_name); + SPDLOG_INFO("Testing Complete"); + SPDLOG_INFO(" Total test run time (ms) {}", total_time / 1000.0f); + SPDLOG_INFO(" Configured test time (ms) {}", perf_config_.total_transmit_time); + SPDLOG_INFO(" Total subscribed objects {}, bytes {}", total_objects_, total_bytes_); + SPDLOG_INFO(" Total published objects {}, bytes {}", + test_complete.test_metrics.total_published_objects, + test_complete.test_metrics.total_published_bytes); + SPDLOG_INFO(" Subscribed delta objects {}, bytes {}", + test_complete.test_metrics.total_published_objects - total_objects_, + test_complete.test_metrics.total_published_bytes - total_bytes_); + SPDLOG_INFO(" Bitrate (bps):"); + SPDLOG_INFO(" min {}", min_bitrate_); + SPDLOG_INFO(" max {}", max_bitrate_); + SPDLOG_INFO(" avg {:.3f}", avg_bitrate_); + SPDLOG_INFO(" {}", FormatBitrate(static_cast(avg_bitrate_))); + SPDLOG_INFO(" Object time delta (us):"); + SPDLOG_INFO(" min {}", min_object_time_delta_); + SPDLOG_INFO(" max {}", max_object_time_delta_); + SPDLOG_INFO(" avg {:04.3f} ", avg_object_time_delta_); + SPDLOG_INFO(" Object arrival delta (us):"); + SPDLOG_INFO(" min {}", min_object_arrival_delta_); + SPDLOG_INFO(" max {}", max_object_arrival_delta_); + SPDLOG_INFO(" avg {:04.3f}", avg_object_arrival_delta_); + SPDLOG_INFO(" over_multiplier {}", + static_cast(avg_object_arrival_delta_ / (perf_config_.transmit_interval * 10000))); + SPDLOG_INFO("--------------------------------------------"); + + // id,test_name,total_time,total_transmit_time,total_objects,total_bytes,sent_object,sent_bytes,min_bitrate, + // max_bitrate,avg_bitrate,min_time,maxtime,avg_time,min_arrival,max_arrival,avg_arrival, + // delta_objects,arrival_over_multiplier + SPDLOG_INFO("OR COMPLETE, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}", + test_identifier_, + perf_config_.test_name, + total_time, + perf_config_.total_transmit_time, + total_objects_, + total_bytes_, + test_complete.test_metrics.total_published_objects, + test_complete.test_metrics.total_published_bytes, + min_bitrate_, + max_bitrate_, + avg_bitrate_, + min_object_time_delta_, + max_object_time_delta_, + avg_object_time_delta_, + min_object_arrival_delta_, + max_object_arrival_delta_, + avg_object_arrival_delta_, + test_complete.test_metrics.total_published_objects - total_objects_, + static_cast(avg_object_arrival_delta_ / (perf_config_.transmit_interval * 10000))); + terminate_ = true; + return; + } else { + SPDLOG_WARN( + "OR, {}, {} - unknown data identifier {}", test_identifier_, perf_config_.test_name, (int)test_mode_); + } + + last_local_now_ = local_now_; + first_pass_ = false; + } + + void PerfSubscribeTrackHandler::MetricsSampled(const quicr::SubscribeTrackMetrics& metrics) + { + metrics_ = metrics; + if (last_bytes_ == 0) { + last_metric_time_ = + std::chrono::time_point_cast(std::chrono::system_clock::now()); + last_bytes_ = metrics.bytes_received; + return; + } + + auto now = std::chrono::time_point_cast(std::chrono::system_clock::now()); + auto diff = std::chrono::duration_cast(now - last_metric_time_); + + if (test_mode_ == qperf::TestMode::kRunning) { + std::uint64_t delta_bytes = metrics_.bytes_received - last_bytes_; + std::uint64_t bitrate = ((delta_bytes) * 8) / std::max(diff.count(), std::int64_t(1)); + metric_samples_ += 1; + bitrate_total_ += bitrate; + if (min_bitrate_ == 0) { + min_bitrate_ = bitrate; + } + max_bitrate_ = bitrate > max_bitrate_ ? bitrate : max_bitrate_; + min_bitrate_ = bitrate < min_bitrate_ ? bitrate : min_bitrate_; + avg_bitrate_ = (double)bitrate_total_ / (double)metric_samples_; + SPDLOG_INFO("Metrics:, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}", + test_identifier_, + perf_config_.test_name, + bitrate, + FormatBitrate(bitrate), + delta_bytes, + diff.count(), + metrics_.objects_received, + metrics_.bytes_received, + max_bitrate_, + min_bitrate_, + avg_bitrate_); + } + + last_metric_time_ = std::chrono::time_point_cast(std::chrono::system_clock::now()); + last_bytes_ = metrics.bytes_received; + } +} From 619f3fb302403d208c4982b046c041a028e60a53 Mon Sep 17 00:00:00 2001 From: trigaux Date: Tue, 18 Nov 2025 10:41:12 -0500 Subject: [PATCH 6/6] Add separate script for meetings and revert sub script. --- scripts/run_parallel_meetings.sh | 45 ++++++++++++++++++++++++++++++++ scripts/run_parallel_subs.sh | 26 +++++------------- 2 files changed, 51 insertions(+), 20 deletions(-) create mode 100755 scripts/run_parallel_meetings.sh diff --git a/scripts/run_parallel_meetings.sh b/scripts/run_parallel_meetings.sh new file mode 100755 index 0000000..5a0a189 --- /dev/null +++ b/scripts/run_parallel_meetings.sh @@ -0,0 +1,45 @@ +#!/bin/sh + +LOGS_DIR=qperf_logs + +if [ -z "$1" ]; then + echo "Using default number of meetings" + MEETINGS=${1:-100} +elif [ "$1" -eq 0 ]; then + echo "Num meetings must be greater than 0" + exit 1 +else + MEETINGS=${1:-$1} +fi + +if [ -z "$2" ]; then + RELAY="moq://localhost:33435" +else + RELAY="$2" +fi + +if [ -z "$3" ]; then + echo "Config file is required" + exit 1 +else + CONFIG_PATH="$3" +fi + +if [ -z "$4" ]; then + echo "Using default number of clients" + INSTANCES=${1:-100} +elif [ "$4" -eq 0 ]; then + echo "Num clients must be greater than 0" + exit 1 +else + INSTANCES=${4:-$4} +fi + +echo "Running $MEETINGS meetings with $INSTANCES clients each" + +rm -rf $LOGS_DIR +mkdir -p $LOGS_DIR + +for conference_id in $(seq 1 $MEETINGS); do + parallel -j ${INSTANCES} "./qperf --conference_id $conference_id -i {} -n $INSTANCES -c $CONFIG_PATH --connect_uri $RELAY > $LOGS_DIR/t_$conference_id{}logs.txt 2>&1 &" ::: $(seq ${INSTANCES}) +done diff --git a/scripts/run_parallel_subs.sh b/scripts/run_parallel_subs.sh index eec8d27..8151dcc 100755 --- a/scripts/run_parallel_subs.sh +++ b/scripts/run_parallel_subs.sh @@ -3,13 +3,13 @@ LOGS_DIR=qperf_logs if [ -z "$1" ]; then - echo "Using default number of conferences" - CONFERENCES=${1:-100} + echo "Using default number of subscriber clients" + NUM_SUBS=${1:-100} elif [ "$1" -eq 0 ]; then - echo "Num conferences must be greater than 0" + echo "Num subscribers must be greater than 0" exit 1 else - CONFERENCES=${1:-$1} + NUM_SUBS=${1:-$1} # Arg 1 fi if [ -z "$2" ]; then @@ -25,21 +25,7 @@ else CONFIG_PATH="$3" fi -if [ -z "$4" ]; then - echo "Using default number of clients" - INSTANCES=${1:-100} -elif [ "$4" -eq 0 ]; then - echo "Num clients must be greater than 0" - exit 1 -else - INSTANCES=${4:-$4} -fi +echo "Running $NUM_SUBS subscriber clients" -echo "Running $CONFERENCES conferences with $INSTANCES clients each" - -rm -rf $LOGS_DIR mkdir -p $LOGS_DIR - -for conference_id in $(seq 1 $CONFERENCES); do - parallel -j ${INSTANCES} "./qperf --conference_id $conference_id -i {} -n $INSTANCES -c $CONFIG_PATH --connect_uri $RELAY > $LOGS_DIR/t_$conference_id{}logs.txt 2>&1 &" ::: $(seq ${INSTANCES}) -done +parallel -j ${NUM_SUBS} "./qperf_sub -i {} -c $CONFIG_PATH --connect_uri $RELAY > $LOGS_DIR/t_{}logs.txt 2>&1" ::: $(seq ${NUM_SUBS})