From 58def77a017407d4019ed05c6b1af6924d11b6cb Mon Sep 17 00:00:00 2001 From: Rich Logan Date: Fri, 30 Jan 2026 16:36:06 +0000 Subject: [PATCH] Updates for FETCH API change --- src/client_manager.cc | 48 ++++++++++++++++++++++--------------------- src/client_manager.h | 4 ++-- src/fetch_handler.cc | 14 +++---------- src/fetch_handler.h | 18 ++++++---------- 4 files changed, 36 insertions(+), 48 deletions(-) diff --git a/src/client_manager.cc b/src/client_manager.cc index 7c49756..69b1f12 100644 --- a/src/client_manager.cc +++ b/src/client_manager.cc @@ -770,8 +770,8 @@ namespace laps { const quicr::FullTrackName& track_full_name, quicr::messages::SubscriberPriority priority, quicr::messages::GroupOrder group_order, - quicr::messages::Location start, - std::optional end) + const quicr::messages::Location& start, + const quicr::messages::FetchEndLocation& end) { auto reason_code = quicr::FetchResponse::ReasonCode::kOk; std::optional largest_location = std::nullopt; @@ -791,12 +791,11 @@ namespace laps { reason_code = quicr::FetchResponse::ReasonCode::kNoObjects; } - if (start.group > end->group || largest_location.value().group < start.group) { + if (start.group > end.group || largest_location.value().group < start.group) { reason_code = quicr::FetchResponse::ReasonCode::kInvalidRange; } - const auto& cache_entries = - cache_entry_it->second.Get(start.group, end->group != 0 ? end->group : cache_entry_it->second.Size()); + const auto& cache_entries = cache_entry_it->second.Get(start.group, end.group); if (cache_entries.empty()) { reason_code = quicr::FetchResponse::ReasonCode::kNoObjects; @@ -809,10 +808,6 @@ namespace laps { stop_fetch_.try_emplace({ connection_handle, request_id }, false); - if (!end.has_value()) { - end = { 0, 0 }; - } - SPDLOG_LOGGER_DEBUG(LOGGER, "Fetch received conn_id: {} request_id: {} range start group: {} start object: {} end " "group: {} end object: {} largest_location: {}", @@ -820,8 +815,8 @@ namespace laps { request_id, start.group, start.object, - end->group, - end->object, + end.group, + end.object.has_value() ? std::to_string(*end.object) : "to_end", largest_location.has_value() ? largest_location.value().group : 0); std::thread retrieve_cache_thread([=, cache_entries = std::move(cache_entries), this] { @@ -831,14 +826,8 @@ namespace laps { if (rc != quicr::FetchResponse::ReasonCode::kOk) { // Try to see if original publisher can provide the data - auto track_handler = FetchTrackHandler::Create(pub_fetch_h, - track_full_name, - priority, - group_order, - start.group, - end->group, - start.object, - end->object); + auto track_handler = + FetchTrackHandler::Create(pub_fetch_h, track_full_name, priority, group_order, start, end); quicr::ConnectionHandle pub_connection_handle = 0; @@ -933,8 +922,15 @@ namespace laps { return; } - if (end->object && object.headers.group_id == end->group && - object.headers.object_id >= end->object) { + // When intra-group, skip any objects prior to start. + if (start.group == end.group && object.headers.group_id == start.group && + object.headers.object_id < start.object) { + continue; + } + + // Are we done? (end location is inclusive) + if (end.object.has_value() && object.headers.group_id == end.group && + object.headers.object_id > *end.object) { return; } @@ -972,9 +968,10 @@ namespace laps { const quicr::messages::JoiningFetchAttributes& attributes) { uint64_t joining_start = 0; + const auto largest = GetLargestAvailable(track_full_name); if (attributes.relative) { - if (const auto largest = GetLargestAvailable(track_full_name)) { + if (largest.has_value()) { if (largest->group > attributes.joining_start) joining_start = largest->group - attributes.joining_start; } @@ -982,13 +979,18 @@ namespace laps { joining_start = attributes.joining_start; } + // For joining fetch, fetch from joining_start to the largest available group + const quicr::messages::FetchEndLocation end_location = { + largest.has_value() ? largest->group : joining_start, std::nullopt + }; + FetchReceived(connection_handle, request_id, track_full_name, attributes.priority, attributes.group_order, { joining_start, 0 }, - std::nullopt); + end_location); } void ClientManager::FetchCancelReceived(quicr::ConnectionHandle connection_handle, uint64_t request_id) diff --git a/src/client_manager.h b/src/client_manager.h index b196492..83ad1e5 100644 --- a/src/client_manager.h +++ b/src/client_manager.h @@ -134,8 +134,8 @@ namespace laps { const quicr::FullTrackName& track_full_name, quicr::messages::SubscriberPriority priority, quicr::messages::GroupOrder group_order, - quicr::messages::Location start, - std::optional end); + const quicr::messages::Location& start, + const quicr::messages::FetchEndLocation& end); State& state_; const Config& config_; diff --git a/src/fetch_handler.cc b/src/fetch_handler.cc index a5d01c1..d077a80 100644 --- a/src/fetch_handler.cc +++ b/src/fetch_handler.cc @@ -11,17 +11,9 @@ namespace laps { const quicr::FullTrackName& full_track_name, quicr::messages::ObjectPriority priority, quicr::messages::GroupOrder group_order, - quicr::messages::GroupId start_group, - quicr::messages::GroupId end_group, - quicr::messages::GroupId start_object, - quicr::messages::GroupId end_object) - : quicr::FetchTrackHandler(full_track_name, - priority, - group_order, - start_group, - end_group, - start_object, - end_object) + const quicr::messages::Location& start_location, + const quicr::messages::FetchEndLocation& end_location) + : quicr::FetchTrackHandler(full_track_name, priority, group_order, start_location, end_location) , publish_fetch_handler_(std::move(publish_fetch_handler)) { } diff --git a/src/fetch_handler.h b/src/fetch_handler.h index 4ce7a25..ae30972 100644 --- a/src/fetch_handler.h +++ b/src/fetch_handler.h @@ -17,10 +17,8 @@ namespace laps { const quicr::FullTrackName& full_track_name, quicr::messages::ObjectPriority priority, quicr::messages::GroupOrder group_order, - quicr::messages::GroupId start_group, - quicr::messages::GroupId end_group, - quicr::messages::GroupId start_object, - quicr::messages::GroupId end_object); + const quicr::messages::Location& start_location, + const quicr::messages::FetchEndLocation& end_location); public: static std::shared_ptr Create( @@ -28,19 +26,15 @@ namespace laps { const quicr::FullTrackName& full_track_name, quicr::messages::ObjectPriority priority, quicr::messages::GroupOrder group_order, - quicr::messages::GroupId start_group, - quicr::messages::GroupId end_group, - quicr::messages::GroupId start_object, - quicr::messages::GroupId end_object) + const quicr::messages::Location& start_location, + const quicr::messages::FetchEndLocation& end_location) { return std::shared_ptr(new FetchTrackHandler(publish_fetch_handler, full_track_name, priority, group_order, - start_group, - end_group, - start_object, - end_object)); + start_location, + end_location)); } void StatusChanged(Status status) override;