From 2552a6da8f37ec1fb027a3e14d827bf42641f9b0 Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Tue, 27 Jan 2026 19:20:00 -0800 Subject: [PATCH 01/12] Add subgroup support with merge of latest main --- CMakeLists.txt | 2 +- dependencies/libquicr | 2 +- src/client_manager.cc | 3 +- src/fetch_handler.cc | 24 +++++----- src/peering/peer_manager.cc | 5 +- src/peering/peer_session.cc | 7 --- src/publish_handler.h | 7 --- src/subscribe_handler.cc | 93 +++++++++++++++++-------------------- src/subscribe_handler.h | 9 ++-- 9 files changed, 64 insertions(+), 88 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1ea512d..6b726de 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,7 +15,7 @@ set (CMAKE_POLICY_VERSION_MINIMUM 3.5) ### Define the project name and version ### Version will be added to version output of apps and library project(laps - VERSION 0.14.40 + VERSION 0.14.41 DESCRIPTION "Latency Aware Publish/Subscribe" LANGUAGES CXX) diff --git a/dependencies/libquicr b/dependencies/libquicr index d6deca9..2dadd7f 160000 --- a/dependencies/libquicr +++ b/dependencies/libquicr @@ -1 +1 @@ -Subproject commit d6deca9a30c55e31054bcd5d7681ba33cfb93231 +Subproject commit 2dadd7f114fac78c05b5836c6e8e4df97c0b5edc diff --git a/src/client_manager.cc b/src/client_manager.cc index 3adfe53..40ed1f2 100644 --- a/src/client_manager.cc +++ b/src/client_manager.cc @@ -1029,7 +1029,8 @@ namespace laps { if (!(it->second->GetPendingNewRquestId().has_value() && *it->second->GetPendingNewRquestId() == 0 && group_id == 0) && - (group_id == 0 || it->second->GetLatestGroupId() < group_id)) { + (group_id == 0 || (it->second->GetPendingNewRquestId().has_value() && + it->second->GetPendingNewRquestId().value() < group_id))) { it->second->SetNewGroupRequestId(group_id); DampenOrUpdateTrackSubscription(it->second, true); diff --git a/src/fetch_handler.cc b/src/fetch_handler.cc index 68f994b..a5d01c1 100644 --- a/src/fetch_handler.cc +++ b/src/fetch_handler.cc @@ -31,32 +31,32 @@ namespace laps { std::shared_ptr> data) { subscribe_track_metrics_.bytes_received += data->size(); + auto& stream = streams_[stream_id]; if (is_start) { - // Process the fetch header and update it so that it works for the requestor - stream_buffer_.Clear(); + stream.buffer.Clear(); - stream_buffer_.InitAny(); - stream_buffer_.Push(*data); + stream.buffer.InitAny(); + stream.buffer.Push(*data); // Expect that on initial start of stream, there is enough data to process the stream headers - auto f_hdr = stream_buffer_.GetAny(); - if (not(stream_buffer_ >> f_hdr)) { + auto f_hdr = stream.buffer.GetAny(); + if (not(stream.buffer >> f_hdr)) { SPDLOG_ERROR("Not enough data to process new stream headers, stream is invalid len: {} / {}", - stream_buffer_.Size(), + stream.buffer.Size(), data->size()); // TODO: Add metrics to track this return; } - size_t header_size = data->size() - stream_buffer_.Size(); + size_t header_size = data->size() - stream.buffer.Size(); SPDLOG_DEBUG("Fetch header added in rid: {} out rid: {} data sz: {} sbuf_size: {} header size: {}", f_hdr.request_id, *publish_fetch_handler_->GetRequestId(), data->size(), - stream_buffer_.Size(), + stream.buffer.Size(), header_size); f_hdr.request_id = *publish_fetch_handler_->GetRequestId(); @@ -65,12 +65,12 @@ namespace laps { if (header_size < data->size()) { bytes->insert(bytes->end(), data->begin() + header_size, data->end()); - stream_buffer_.Pop(stream_buffer_.Size()); + stream.buffer.Pop(stream.buffer.Size()); } - publish_fetch_handler_->ForwardPublishedData(true, std::move(bytes)); + publish_fetch_handler_->ForwardPublishedData(true, 0, 0, std::move(bytes)); } else { - publish_fetch_handler_->ForwardPublishedData(false, std::move(data)); + publish_fetch_handler_->ForwardPublishedData(false, 0, 0, std::move(data)); } } diff --git a/src/peering/peer_manager.cc b/src/peering/peer_manager.cc index 86dae00..f4dd525 100644 --- a/src/peering/peer_manager.cc +++ b/src/peering/peer_manager.cc @@ -441,7 +441,7 @@ namespace laps::peering { entry.stream_id = stream_id; // Update SNS_ID if new stream header included or if datagram (both have sns_id) - if (eflags.new_stream || eflags.use_reliable == false) { + if (is_new_stream || eflags.use_reliable == false) { auto sns_id_bytes = BytesOf(entry.out_sns_id); std::copy(sns_id_bytes.rbegin(), sns_id_bytes.rend(), data_out.begin() + 2); } @@ -494,9 +494,6 @@ namespace laps::peering { case DataType::kNewStream: set_sns_id = true; eflags.use_reliable = true; - eflags.new_stream = true; - eflags.clear_tx_queue = true; - eflags.use_reset = true; break; } diff --git a/src/peering/peer_session.cc b/src/peering/peer_session.cc index 488b177..cd9970e 100644 --- a/src/peering/peer_session.cc +++ b/src/peering/peer_session.cc @@ -500,13 +500,6 @@ namespace laps::peering { data_header.Deserialize(*data); - // Set Any object if new stream - if (data_header.type == DataType::kNewStream) { - eflags.new_stream = true; - eflags.clear_tx_queue = true; - eflags.use_reset = true; - } - // Pipeline forward to other peers. Not all data may have been popped, so only forward popped data manager_.ForwardPeerData( GetSessionId(), true, stream_id.has_value() ? *stream_id : 0, data_header, data, hdr_len, eflags); diff --git a/src/publish_handler.h b/src/publish_handler.h index b7ed9f7..287a26c 100644 --- a/src/publish_handler.h +++ b/src/publish_handler.h @@ -24,13 +24,6 @@ namespace laps { bool pipeline_{ false }; // True indicates using pipeline forwarding, false is object forwarding - /** - * @brief Check if the first object has been sent or not - * - * @return True if one objet has been sent, False if no objects yet - */ - constexpr bool SentFirstObject() const noexcept { return latest_object_id_.has_value(); } - private: ClientManager& server_; }; diff --git a/src/subscribe_handler.cc b/src/subscribe_handler.cc index 5e3ef16..bdf0724 100644 --- a/src/subscribe_handler.cc +++ b/src/subscribe_handler.cc @@ -38,13 +38,11 @@ namespace laps { CacheObject object{ object_headers, { data.begin(), data.end() } }; - if (pending_new_group_request_id_.has_value() && current_group_id_ != object_headers.group_id) { + if (pending_new_group_request_id_.has_value() && + (object_headers.group_id == 0 || object_headers.group_id > *pending_new_group_request_id_)) { pending_new_group_request_id_.reset(); } - current_group_id_ = object_headers.group_id; - current_subgroup_id_ = object_headers.subgroup_id; - if (auto group = cache_entry.Get(object_headers.group_id)) { group->insert(std::move(object)); } else { @@ -87,25 +85,22 @@ namespace laps { { is_datagram_ = false; - if (stream_id > current_stream_id_) { - current_stream_id_ = stream_id; - } else if (stream_id < current_stream_id_) { - SPDLOG_DEBUG( - "Old stream data received, stream_id: {} is less than {}, ignoring", stream_id, current_stream_id_); - return; - } + auto& stream = streams_[stream_id]; + + // Pipeline forward immediately to subscribers/peers + // ForwardReceivedData(is_start, stream.current_group_id, stream.current_subgroup_id, data); // Process MoQ object from stream data - if (is_start || not stream_buffer_.AnyHasValue()) { - stream_buffer_.Clear(); + if (is_start) { + stream.buffer.Clear(); - stream_buffer_.InitAny(); - stream_buffer_.Push(*data); + stream.buffer.InitAny(); + stream.buffer.Push(*data); // Expect that on initial start of stream, there is enough data to process the stream headers - auto& s_hdr = stream_buffer_.GetAny(); - if (not(stream_buffer_ >> s_hdr)) { + auto& s_hdr = stream.buffer.GetAny(); + if (not(stream.buffer >> s_hdr)) { SPDLOG_ERROR("Not enough data to process new stream headers, stream is invalid"); // TODO: Add metrics to track this return; @@ -131,61 +126,59 @@ namespace laps { } updated_data->insert( - updated_data->end(), data->begin() + (data->size() - stream_buffer_.Size()), data->end()); + updated_data->end(), data->begin() + (data->size() - stream.buffer.Size()), data->end()); - ForwardReceivedData(true, updated_data); + ForwardReceivedData(is_start, stream.current_group_id, stream.current_subgroup_id, updated_data); } else { - ForwardReceivedData(is_start, data); + ForwardReceivedData(is_start, stream.current_group_id, stream.current_subgroup_id, data); } } else if (data) { - ForwardReceivedData(is_start, data); + ForwardReceivedData(is_start, stream.current_group_id, stream.current_subgroup_id, data); // Buffer for cache/full parse - stream_buffer_.Push(*data); + stream.buffer.Push(*data); } - auto& s_hdr = stream_buffer_.GetAny(); + auto& s_hdr = stream.buffer.GetAny(); while (true) { - if (not stream_buffer_.AnyHasValueB()) { - stream_buffer_.InitAnyB(); + if (not stream.buffer.AnyHasValueB()) { + stream.buffer.InitAnyB(); } - auto& obj = stream_buffer_.GetAnyB(); + auto& obj = stream.buffer.GetAnyB(); obj.stream_type = s_hdr.type; const auto subgroup_properties = quicr::messages::StreamHeaderProperties(s_hdr.type); - if (stream_buffer_ >> obj) { + if (stream.buffer >> obj) { subscribe_track_metrics_.objects_received++; - if (next_object_id_.has_value()) { - if (current_group_id_ != s_hdr.group_id || current_subgroup_id_ != s_hdr.subgroup_id) { - next_object_id_ = obj.object_delta; + if (stream.next_object_id.has_value()) { + if (stream.current_group_id != s_hdr.group_id || stream.current_group_id != s_hdr.subgroup_id) { + stream.next_object_id = obj.object_delta; } else { - *next_object_id_ += obj.object_delta; + *stream.next_object_id += obj.object_delta; } } else { - next_object_id_ = obj.object_delta; + stream.next_object_id = obj.object_delta; } - if (pending_new_group_request_id_.has_value() && current_group_id_ != s_hdr.group_id) { + if (pending_new_group_request_id_.has_value() && + (s_hdr.group_id == 0 || s_hdr.group_id > *pending_new_group_request_id_)) { pending_new_group_request_id_.reset(); } - current_group_id_ = s_hdr.group_id; - current_subgroup_id_ = s_hdr.subgroup_id.value(); - if (!s_hdr.subgroup_id.has_value()) { if (subgroup_properties.subgroup_id_type != quicr::messages::SubgroupIdType::kSetFromFirstObject) { SPDLOG_ERROR("Bad stream header type when no subgroup ID: {0}", static_cast(s_hdr.type)); return; } - s_hdr.subgroup_id = next_object_id_; + s_hdr.subgroup_id = stream.next_object_id; } ObjectReceived({ s_hdr.group_id, - next_object_id_.value(), + stream.next_object_id.value(), s_hdr.subgroup_id.value(), obj.payload.size(), obj.object_status, @@ -196,8 +189,8 @@ namespace laps { obj.immutable_extensions }, obj.payload); - *next_object_id_ += 1; - stream_buffer_.ResetAnyB(); + *stream.next_object_id += 1; + stream.buffer.ResetAnyB(); } break; // Not complete, wait for more data @@ -208,16 +201,13 @@ namespace laps { { is_datagram_ = true; - // Pipeline forward immediately to subscribers/peers - ForwardReceivedData(false, data); - - // Process MoQ object from stream data - stream_buffer_.Clear(); - - stream_buffer_.Push(*data); + dgram_buffer_.Clear(); + dgram_buffer_.Push(*data); quicr::messages::ObjectDatagram msg; - if (stream_buffer_ >> msg) { + if (dgram_buffer_ >> msg) { + ForwardReceivedData(false, msg.group_id, 0, data); + subscribe_track_metrics_.objects_received++; subscribe_track_metrics_.bytes_received += msg.payload.size(); ObjectReceived( @@ -237,6 +227,8 @@ namespace laps { } void SubscribeTrackHandler::ForwardReceivedData(bool is_new_stream, + uint64_t group_id, + uint64_t subgroup_id, std::shared_ptr> data) { auto self_connection_handle = GetConnectionId(); @@ -299,13 +291,14 @@ namespace laps { } const auto pub_track_h = sub_info.publish_handlers[self_connection_handle]; - if (is_new_stream && pub_track_h->SentFirstObject()) { + if (is_new_stream) { pub_track_h->pipeline_ = true; } else if (not pub_track_h->pipeline_) { continue; } - sub_info.publish_handlers[self_connection_handle]->ForwardPublishedData(is_new_stream, data); + sub_info.publish_handlers[self_connection_handle]->ForwardPublishedData( + is_new_stream, group_id, subgroup_id, data); } } diff --git a/src/subscribe_handler.h b/src/subscribe_handler.h index f395cdf..a2e6801 100644 --- a/src/subscribe_handler.h +++ b/src/subscribe_handler.h @@ -33,7 +33,6 @@ namespace laps { void SetFromPeer(); std::optional GetPendingNewRquestId() { return pending_new_group_request_id_; }; - uint64_t GetLatestGroupId() { return current_group_id_; } struct PublisherLastUpdateInfo { @@ -41,14 +40,14 @@ namespace laps { } pub_last_update_info_; private: - void ForwardReceivedData(bool is_new_stream, std::shared_ptr> data); + void ForwardReceivedData(bool is_new_stream, + uint64_t group_id, + uint64_t subgroup_id, + std::shared_ptr> data); ClientManager& server_; - uint64_t current_stream_id_{ 0 }; - bool is_datagram_{ false }; bool is_from_peer_{ false }; // Indicates that the subscribe handler was created by peer manager for recv data - quicr::StreamBuffer stream_buffer_; }; } // namespace laps From e942dc737caeabdb7d1ac113c50c77c8e40fe402 Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Tue, 27 Jan 2026 21:33:47 -0800 Subject: [PATCH 02/12] fix new group logic and subgroup compare --- src/client_manager.cc | 7 +++---- src/subscribe_handler.cc | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/client_manager.cc b/src/client_manager.cc index 40ed1f2..7c49756 100644 --- a/src/client_manager.cc +++ b/src/client_manager.cc @@ -1027,10 +1027,9 @@ namespace laps { break; } - if (!(it->second->GetPendingNewRquestId().has_value() && *it->second->GetPendingNewRquestId() == 0 && - group_id == 0) && - (group_id == 0 || (it->second->GetPendingNewRquestId().has_value() && - it->second->GetPendingNewRquestId().value() < group_id))) { + if (!it->second->GetPendingNewRquestId().has_value() || + (group_id == 0 && *it->second->GetPendingNewRquestId()) || + *it->second->GetPendingNewRquestId() < group_id) { it->second->SetNewGroupRequestId(group_id); DampenOrUpdateTrackSubscription(it->second, true); diff --git a/src/subscribe_handler.cc b/src/subscribe_handler.cc index bdf0724..70e4ae6 100644 --- a/src/subscribe_handler.cc +++ b/src/subscribe_handler.cc @@ -154,7 +154,7 @@ namespace laps { subscribe_track_metrics_.objects_received++; if (stream.next_object_id.has_value()) { - if (stream.current_group_id != s_hdr.group_id || stream.current_group_id != s_hdr.subgroup_id) { + if (stream.current_group_id != s_hdr.group_id || stream.current_subgroup_id != s_hdr.subgroup_id) { stream.next_object_id = obj.object_delta; } else { *stream.next_object_id += obj.object_delta; From 7259be614b88029875dec9ae43a28ce98af2c12d Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Tue, 27 Jan 2026 22:09:28 -0800 Subject: [PATCH 03/12] Fix stream data delta --- src/subscribe_handler.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/subscribe_handler.cc b/src/subscribe_handler.cc index 70e4ae6..fb5ad23 100644 --- a/src/subscribe_handler.cc +++ b/src/subscribe_handler.cc @@ -163,6 +163,9 @@ namespace laps { stream.next_object_id = obj.object_delta; } + stream.current_group_id = s_hdr.group_id; + stream.current_subgroup_id = s_hdr.subgroup_id.value(); + if (pending_new_group_request_id_.has_value() && (s_hdr.group_id == 0 || s_hdr.group_id > *pending_new_group_request_id_)) { pending_new_group_request_id_.reset(); From efc57168fdd582745bf2fa0470f91b294d7a6245 Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Wed, 28 Jan 2026 10:43:54 -0800 Subject: [PATCH 04/12] Fix pipeline to support subgroup streams instead of track --- src/publish_handler.cc | 15 ++++++++++++++- src/publish_handler.h | 3 ++- src/subscribe_handler.cc | 12 +++--------- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/publish_handler.cc b/src/publish_handler.cc index 409b7d5..af979f1 100644 --- a/src/publish_handler.cc +++ b/src/publish_handler.cc @@ -46,7 +46,7 @@ namespace laps { break; case Status::kPaused: reason = "paused"; - pipeline_ = false; + // TODO: Pause should likely clear out all subgroups in flight and start over fresh break; case Status::kSubscriptionUpdated: reason = "subscription updated"; @@ -76,4 +76,17 @@ namespace laps { metrics.quic.tx_queue_discards, metrics.quic.tx_queue_size.avg); } + + bool PublishTrackHandler::SentFirstObject(uint32_t group_id, uint32_t subgroup_id) + { + const auto group_it = stream_info_by_group_.find(group_id); + if (group_it != stream_info_by_group_.end()) { + const auto subgroup_it = group_it->second.find(subgroup_id); + if (subgroup_it != group_it->second.end()) { + return subgroup_it->second.last_object_id.has_value(); + } + } + + return false; + } } \ No newline at end of file diff --git a/src/publish_handler.h b/src/publish_handler.h index 287a26c..95d5df4 100644 --- a/src/publish_handler.h +++ b/src/publish_handler.h @@ -22,7 +22,8 @@ namespace laps { void StatusChanged(Status status) override; void MetricsSampled(const quicr::PublishTrackMetrics& metrics) override; - bool pipeline_{ false }; // True indicates using pipeline forwarding, false is object forwarding + // note: pipelining starts after the first object + bool SentFirstObject(uint32_t group_id, uint32_t subgroup_id); private: ClientManager& server_; diff --git a/src/subscribe_handler.cc b/src/subscribe_handler.cc index fb5ad23..fe657d2 100644 --- a/src/subscribe_handler.cc +++ b/src/subscribe_handler.cc @@ -64,14 +64,14 @@ namespace laps { continue; } - if (sub_info.publish_handlers[self_connection_handle]->pipeline_) { + if (sub_info.publish_handlers[self_connection_handle]->SentFirstObject(object_headers.group_id, + object_headers.subgroup_id)) { continue; } if (object.headers.group_id >= sub_info.start_location.group && object.headers.object_id >= sub_info.start_location.object) { sub_info.publish_handlers[self_connection_handle]->PublishObject(object_headers, data); - sub_info.publish_handlers[self_connection_handle]->pipeline_ = true; } } } catch (const std::exception& e) { @@ -87,9 +87,6 @@ namespace laps { auto& stream = streams_[stream_id]; - // Pipeline forward immediately to subscribers/peers - // ForwardReceivedData(is_start, stream.current_group_id, stream.current_subgroup_id, data); - // Process MoQ object from stream data if (is_start) { stream.buffer.Clear(); @@ -294,9 +291,7 @@ namespace laps { } const auto pub_track_h = sub_info.publish_handlers[self_connection_handle]; - if (is_new_stream) { - pub_track_h->pipeline_ = true; - } else if (not pub_track_h->pipeline_) { + if (not pub_track_h->SentFirstObject(group_id, subgroup_id)) { continue; } @@ -364,5 +359,4 @@ namespace laps { { is_from_peer_ = true; } - } \ No newline at end of file From 40ebcb51654ec26f2dde0dfdca424d4d116b7e82 Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Wed, 28 Jan 2026 15:31:14 -0800 Subject: [PATCH 05/12] Update libquicr --- dependencies/libquicr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dependencies/libquicr b/dependencies/libquicr index 2dadd7f..8d0f78c 160000 --- a/dependencies/libquicr +++ b/dependencies/libquicr @@ -1 +1 @@ -Subproject commit 2dadd7f114fac78c05b5836c6e8e4df97c0b5edc +Subproject commit 8d0f78caa8fad0d7e0b301abfac474375d8a8f03 From 9ed27fd66f288ff3fccf49c50fbaf56835487af8 Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Mon, 2 Feb 2026 14:46:01 -0800 Subject: [PATCH 06/12] update libquicr --- dependencies/libquicr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dependencies/libquicr b/dependencies/libquicr index 8d0f78c..4c2544b 160000 --- a/dependencies/libquicr +++ b/dependencies/libquicr @@ -1 +1 @@ -Subproject commit 8d0f78caa8fad0d7e0b301abfac474375d8a8f03 +Subproject commit 4c2544b6b8ab7065efcbaa8609922212c835fe7e From edec71e45f05c0a879ba2500e586b2a79c5edf7e Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Mon, 2 Feb 2026 15:36:25 -0800 Subject: [PATCH 07/12] libquicr fetch updates --- src/client_manager.cc | 10 ++++------ src/client_manager.h | 2 +- src/fetch_handler.cc | 14 +++----------- src/fetch_handler.h | 22 ++++++---------------- 4 files changed, 14 insertions(+), 34 deletions(-) diff --git a/src/client_manager.cc b/src/client_manager.cc index 7c49756..c8042be 100644 --- a/src/client_manager.cc +++ b/src/client_manager.cc @@ -771,7 +771,7 @@ namespace laps { quicr::messages::SubscriberPriority priority, quicr::messages::GroupOrder group_order, quicr::messages::Location start, - std::optional end) + std::optional end) { auto reason_code = quicr::FetchResponse::ReasonCode::kOk; std::optional largest_location = std::nullopt; @@ -821,7 +821,7 @@ namespace laps { start.group, start.object, end->group, - end->object, + end->object.value_or(0), largest_location.has_value() ? largest_location.value().group : 0); std::thread retrieve_cache_thread([=, cache_entries = std::move(cache_entries), this] { @@ -835,10 +835,8 @@ namespace laps { track_full_name, priority, group_order, - start.group, - end->group, - start.object, - end->object); + { .group = start.group, .object = start.object }, + { .group = end->group, .object = end->object }); quicr::ConnectionHandle pub_connection_handle = 0; diff --git a/src/client_manager.h b/src/client_manager.h index b196492..c93acbf 100644 --- a/src/client_manager.h +++ b/src/client_manager.h @@ -135,7 +135,7 @@ namespace laps { quicr::messages::SubscriberPriority priority, quicr::messages::GroupOrder group_order, quicr::messages::Location start, - std::optional end); + std::optional end); State& state_; const Config& config_; diff --git a/src/fetch_handler.cc b/src/fetch_handler.cc index a5d01c1..d077a80 100644 --- a/src/fetch_handler.cc +++ b/src/fetch_handler.cc @@ -11,17 +11,9 @@ namespace laps { const quicr::FullTrackName& full_track_name, quicr::messages::ObjectPriority priority, quicr::messages::GroupOrder group_order, - quicr::messages::GroupId start_group, - quicr::messages::GroupId end_group, - quicr::messages::GroupId start_object, - quicr::messages::GroupId end_object) - : quicr::FetchTrackHandler(full_track_name, - priority, - group_order, - start_group, - end_group, - start_object, - end_object) + const quicr::messages::Location& start_location, + const quicr::messages::FetchEndLocation& end_location) + : quicr::FetchTrackHandler(full_track_name, priority, group_order, start_location, end_location) , publish_fetch_handler_(std::move(publish_fetch_handler)) { } diff --git a/src/fetch_handler.h b/src/fetch_handler.h index 4ce7a25..190bdf0 100644 --- a/src/fetch_handler.h +++ b/src/fetch_handler.h @@ -17,10 +17,8 @@ namespace laps { const quicr::FullTrackName& full_track_name, quicr::messages::ObjectPriority priority, quicr::messages::GroupOrder group_order, - quicr::messages::GroupId start_group, - quicr::messages::GroupId end_group, - quicr::messages::GroupId start_object, - quicr::messages::GroupId end_object); + const quicr::messages::Location& start_location, + const quicr::messages::FetchEndLocation& end_location); public: static std::shared_ptr Create( @@ -28,19 +26,11 @@ namespace laps { const quicr::FullTrackName& full_track_name, quicr::messages::ObjectPriority priority, quicr::messages::GroupOrder group_order, - quicr::messages::GroupId start_group, - quicr::messages::GroupId end_group, - quicr::messages::GroupId start_object, - quicr::messages::GroupId end_object) + const quicr::messages::Location& start_location, + const quicr::messages::FetchEndLocation& end_location) { - return std::shared_ptr(new FetchTrackHandler(publish_fetch_handler, - full_track_name, - priority, - group_order, - start_group, - end_group, - start_object, - end_object)); + return std::shared_ptr(new FetchTrackHandler( + publish_fetch_handler, full_track_name, priority, group_order, start_location, end_location)); } void StatusChanged(Status status) override; From 14f69485e3ac9bd5df8baf78d5155d2c76e2b729 Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Tue, 3 Feb 2026 12:09:29 -0800 Subject: [PATCH 08/12] Fix fetch range filter to correctly return objects within range --- dependencies/libquicr | 2 +- src/client_manager.cc | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/dependencies/libquicr b/dependencies/libquicr index 4c2544b..0210f95 160000 --- a/dependencies/libquicr +++ b/dependencies/libquicr @@ -1 +1 @@ -Subproject commit 4c2544b6b8ab7065efcbaa8609922212c835fe7e +Subproject commit 0210f951052d2204fcc2e1598db644bdc8321210 diff --git a/src/client_manager.cc b/src/client_manager.cc index c8042be..d8d7af9 100644 --- a/src/client_manager.cc +++ b/src/client_manager.cc @@ -795,8 +795,7 @@ namespace laps { reason_code = quicr::FetchResponse::ReasonCode::kInvalidRange; } - const auto& cache_entries = - cache_entry_it->second.Get(start.group, end->group != 0 ? end->group : cache_entry_it->second.Size()); + const auto& cache_entries = cache_entry_it->second.Get(start.group, end->group); if (cache_entries.empty()) { reason_code = quicr::FetchResponse::ReasonCode::kNoObjects; @@ -931,12 +930,19 @@ namespace laps { return; } - if (end->object && object.headers.group_id == end->group && - object.headers.object_id >= end->object) { + // Start at start object id + if (start.group == object.headers.group_id && start.object > object.headers.object_id) { + continue; + } + + // Stop at end object, unless end object is zero + if (end->object.has_value() && object.headers.group_id == end->group && *end->object != 0 && + object.headers.object_id > *end->object) { return; } - SPDLOG_TRACE("Fetching group: {} object: {}", object.headers.group_id, object.headers.object_id); + SPDLOG_LOGGER_TRACE( + LOGGER, "Fetching group: {} object: {}", object.headers.group_id, object.headers.object_id); try { pub_fetch_h->PublishObject(object.headers, object.data); From 5ad93a7789ec5d3f4dabeee799f04fd89480ce22 Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Tue, 3 Feb 2026 12:23:04 -0800 Subject: [PATCH 09/12] Allow fetch end-object 0 to only be fetched. Use optional for all --- src/client_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client_manager.cc b/src/client_manager.cc index d8d7af9..72e5986 100644 --- a/src/client_manager.cc +++ b/src/client_manager.cc @@ -936,7 +936,7 @@ namespace laps { } // Stop at end object, unless end object is zero - if (end->object.has_value() && object.headers.group_id == end->group && *end->object != 0 && + if (end->object.has_value() && object.headers.group_id == end->group && object.headers.object_id > *end->object) { return; } From 38743c920fd62d1561ccbd37539ac4cdecc01f20 Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Tue, 3 Feb 2026 12:41:44 -0800 Subject: [PATCH 10/12] Update libquicr --- dependencies/libquicr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dependencies/libquicr b/dependencies/libquicr index 0210f95..6b8a217 160000 --- a/dependencies/libquicr +++ b/dependencies/libquicr @@ -1 +1 @@ -Subproject commit 0210f951052d2204fcc2e1598db644bdc8321210 +Subproject commit 6b8a21740733ba48b0da6fcae055a4740dde5081 From ec7b9b7403c953a7fa2a241909523cdbeca64c9a Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Tue, 3 Feb 2026 13:05:19 -0800 Subject: [PATCH 11/12] update libquicr to latest main --- dependencies/libquicr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dependencies/libquicr b/dependencies/libquicr index 6b8a217..25cb5da 160000 --- a/dependencies/libquicr +++ b/dependencies/libquicr @@ -1 +1 @@ -Subproject commit 6b8a21740733ba48b0da6fcae055a4740dde5081 +Subproject commit 25cb5dacb8125994670ae4cc5620d97978473e44 From 8f8c178cae28744912d3a50951047058bedb659d Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Tue, 3 Feb 2026 21:15:17 -0800 Subject: [PATCH 12/12] Remove fetch end optional --- src/client_manager.cc | 29 +++++++++++++---------------- src/client_manager.h | 2 +- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/src/client_manager.cc b/src/client_manager.cc index 72e5986..e689a30 100644 --- a/src/client_manager.cc +++ b/src/client_manager.cc @@ -771,7 +771,7 @@ namespace laps { quicr::messages::SubscriberPriority priority, quicr::messages::GroupOrder group_order, quicr::messages::Location start, - std::optional end) + quicr::messages::FetchEndLocation end) { auto reason_code = quicr::FetchResponse::ReasonCode::kOk; std::optional largest_location = std::nullopt; @@ -791,11 +791,11 @@ namespace laps { reason_code = quicr::FetchResponse::ReasonCode::kNoObjects; } - if (start.group > end->group || largest_location.value().group < start.group) { + if (start.group > end.group || largest_location.value().group < start.group) { reason_code = quicr::FetchResponse::ReasonCode::kInvalidRange; } - const auto& cache_entries = cache_entry_it->second.Get(start.group, end->group); + const auto& cache_entries = cache_entry_it->second.Get(start.group, end.group); if (cache_entries.empty()) { reason_code = quicr::FetchResponse::ReasonCode::kNoObjects; @@ -808,10 +808,6 @@ namespace laps { stop_fetch_.try_emplace({ connection_handle, request_id }, false); - if (!end.has_value()) { - end = { 0, 0 }; - } - SPDLOG_LOGGER_DEBUG(LOGGER, "Fetch received conn_id: {} request_id: {} range start group: {} start object: {} end " "group: {} end object: {} largest_location: {}", @@ -819,8 +815,8 @@ namespace laps { request_id, start.group, start.object, - end->group, - end->object.value_or(0), + end.group, + end.object.value_or(0), largest_location.has_value() ? largest_location.value().group : 0); std::thread retrieve_cache_thread([=, cache_entries = std::move(cache_entries), this] { @@ -835,7 +831,7 @@ namespace laps { priority, group_order, { .group = start.group, .object = start.object }, - { .group = end->group, .object = end->object }); + { .group = end.group, .object = end.object }); quicr::ConnectionHandle pub_connection_handle = 0; @@ -936,8 +932,8 @@ namespace laps { } // Stop at end object, unless end object is zero - if (end->object.has_value() && object.headers.group_id == end->group && - object.headers.object_id > *end->object) { + if (end.object.has_value() && object.headers.group_id == end.group && + object.headers.object_id > *end.object) { return; } @@ -976,11 +972,12 @@ namespace laps { const quicr::messages::JoiningFetchAttributes& attributes) { uint64_t joining_start = 0; + std::optional largest_location = GetLargestAvailable(track_full_name); if (attributes.relative) { - if (const auto largest = GetLargestAvailable(track_full_name)) { - if (largest->group > attributes.joining_start) - joining_start = largest->group - attributes.joining_start; + if (largest_location.has_value()) { + if (largest_location->group > attributes.joining_start) + joining_start = largest_location->group - attributes.joining_start; } } else { joining_start = attributes.joining_start; @@ -992,7 +989,7 @@ namespace laps { attributes.priority, attributes.group_order, { joining_start, 0 }, - std::nullopt); + { largest_location->group, std::nullopt }); } void ClientManager::FetchCancelReceived(quicr::ConnectionHandle connection_handle, uint64_t request_id) diff --git a/src/client_manager.h b/src/client_manager.h index c93acbf..b047243 100644 --- a/src/client_manager.h +++ b/src/client_manager.h @@ -135,7 +135,7 @@ namespace laps { quicr::messages::SubscriberPriority priority, quicr::messages::GroupOrder group_order, quicr::messages::Location start, - std::optional end); + quicr::messages::FetchEndLocation end); State& state_; const Config& config_;