diff --git a/CMakeLists.txt b/CMakeLists.txt index 1ea512d..6b726de 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,7 +15,7 @@ set (CMAKE_POLICY_VERSION_MINIMUM 3.5) ### Define the project name and version ### Version will be added to version output of apps and library project(laps - VERSION 0.14.40 + VERSION 0.14.41 DESCRIPTION "Latency Aware Publish/Subscribe" LANGUAGES CXX) diff --git a/dependencies/libquicr b/dependencies/libquicr index d6deca9..25cb5da 160000 --- a/dependencies/libquicr +++ b/dependencies/libquicr @@ -1 +1 @@ -Subproject commit d6deca9a30c55e31054bcd5d7681ba33cfb93231 +Subproject commit 25cb5dacb8125994670ae4cc5620d97978473e44 diff --git a/src/client_manager.cc b/src/client_manager.cc index 3adfe53..e689a30 100644 --- a/src/client_manager.cc +++ b/src/client_manager.cc @@ -771,7 +771,7 @@ namespace laps { quicr::messages::SubscriberPriority priority, quicr::messages::GroupOrder group_order, quicr::messages::Location start, - std::optional end) + quicr::messages::FetchEndLocation end) { auto reason_code = quicr::FetchResponse::ReasonCode::kOk; std::optional largest_location = std::nullopt; @@ -791,12 +791,11 @@ namespace laps { reason_code = quicr::FetchResponse::ReasonCode::kNoObjects; } - if (start.group > end->group || largest_location.value().group < start.group) { + if (start.group > end.group || largest_location.value().group < start.group) { reason_code = quicr::FetchResponse::ReasonCode::kInvalidRange; } - const auto& cache_entries = - cache_entry_it->second.Get(start.group, end->group != 0 ? end->group : cache_entry_it->second.Size()); + const auto& cache_entries = cache_entry_it->second.Get(start.group, end.group); if (cache_entries.empty()) { reason_code = quicr::FetchResponse::ReasonCode::kNoObjects; @@ -809,10 +808,6 @@ namespace laps { stop_fetch_.try_emplace({ connection_handle, request_id }, false); - if (!end.has_value()) { - end = { 0, 0 }; - } - SPDLOG_LOGGER_DEBUG(LOGGER, "Fetch received conn_id: {} request_id: {} range start group: {} start object: {} end " "group: {} end object: {} largest_location: {}", @@ -820,8 +815,8 @@ namespace laps { request_id, start.group, start.object, - end->group, - end->object, + end.group, + end.object.value_or(0), largest_location.has_value() ? largest_location.value().group : 0); std::thread retrieve_cache_thread([=, cache_entries = std::move(cache_entries), this] { @@ -835,10 +830,8 @@ namespace laps { track_full_name, priority, group_order, - start.group, - end->group, - start.object, - end->object); + { .group = start.group, .object = start.object }, + { .group = end.group, .object = end.object }); quicr::ConnectionHandle pub_connection_handle = 0; @@ -933,12 +926,19 @@ namespace laps { return; } - if (end->object && object.headers.group_id == end->group && - object.headers.object_id >= end->object) { + // Start at start object id + if (start.group == object.headers.group_id && start.object > object.headers.object_id) { + continue; + } + + // Stop at end object, unless end object is zero + if (end.object.has_value() && object.headers.group_id == end.group && + object.headers.object_id > *end.object) { return; } - SPDLOG_TRACE("Fetching group: {} object: {}", object.headers.group_id, object.headers.object_id); + SPDLOG_LOGGER_TRACE( + LOGGER, "Fetching group: {} object: {}", object.headers.group_id, object.headers.object_id); try { pub_fetch_h->PublishObject(object.headers, object.data); @@ -972,11 +972,12 @@ namespace laps { const quicr::messages::JoiningFetchAttributes& attributes) { uint64_t joining_start = 0; + std::optional largest_location = GetLargestAvailable(track_full_name); if (attributes.relative) { - if (const auto largest = GetLargestAvailable(track_full_name)) { - if (largest->group > attributes.joining_start) - joining_start = largest->group - attributes.joining_start; + if (largest_location.has_value()) { + if (largest_location->group > attributes.joining_start) + joining_start = largest_location->group - attributes.joining_start; } } else { joining_start = attributes.joining_start; @@ -988,7 +989,7 @@ namespace laps { attributes.priority, attributes.group_order, { joining_start, 0 }, - std::nullopt); + { largest_location->group, std::nullopt }); } void ClientManager::FetchCancelReceived(quicr::ConnectionHandle connection_handle, uint64_t request_id) @@ -1027,9 +1028,9 @@ namespace laps { break; } - if (!(it->second->GetPendingNewRquestId().has_value() && *it->second->GetPendingNewRquestId() == 0 && - group_id == 0) && - (group_id == 0 || it->second->GetLatestGroupId() < group_id)) { + if (!it->second->GetPendingNewRquestId().has_value() || + (group_id == 0 && *it->second->GetPendingNewRquestId()) || + *it->second->GetPendingNewRquestId() < group_id) { it->second->SetNewGroupRequestId(group_id); DampenOrUpdateTrackSubscription(it->second, true); diff --git a/src/client_manager.h b/src/client_manager.h index b196492..b047243 100644 --- a/src/client_manager.h +++ b/src/client_manager.h @@ -135,7 +135,7 @@ namespace laps { quicr::messages::SubscriberPriority priority, quicr::messages::GroupOrder group_order, quicr::messages::Location start, - std::optional end); + quicr::messages::FetchEndLocation end); State& state_; const Config& config_; diff --git a/src/fetch_handler.cc b/src/fetch_handler.cc index 68f994b..d077a80 100644 --- a/src/fetch_handler.cc +++ b/src/fetch_handler.cc @@ -11,17 +11,9 @@ namespace laps { const quicr::FullTrackName& full_track_name, quicr::messages::ObjectPriority priority, quicr::messages::GroupOrder group_order, - quicr::messages::GroupId start_group, - quicr::messages::GroupId end_group, - quicr::messages::GroupId start_object, - quicr::messages::GroupId end_object) - : quicr::FetchTrackHandler(full_track_name, - priority, - group_order, - start_group, - end_group, - start_object, - end_object) + const quicr::messages::Location& start_location, + const quicr::messages::FetchEndLocation& end_location) + : quicr::FetchTrackHandler(full_track_name, priority, group_order, start_location, end_location) , publish_fetch_handler_(std::move(publish_fetch_handler)) { } @@ -31,32 +23,32 @@ namespace laps { std::shared_ptr> data) { subscribe_track_metrics_.bytes_received += data->size(); + auto& stream = streams_[stream_id]; if (is_start) { - // Process the fetch header and update it so that it works for the requestor - stream_buffer_.Clear(); + stream.buffer.Clear(); - stream_buffer_.InitAny(); - stream_buffer_.Push(*data); + stream.buffer.InitAny(); + stream.buffer.Push(*data); // Expect that on initial start of stream, there is enough data to process the stream headers - auto f_hdr = stream_buffer_.GetAny(); - if (not(stream_buffer_ >> f_hdr)) { + auto f_hdr = stream.buffer.GetAny(); + if (not(stream.buffer >> f_hdr)) { SPDLOG_ERROR("Not enough data to process new stream headers, stream is invalid len: {} / {}", - stream_buffer_.Size(), + stream.buffer.Size(), data->size()); // TODO: Add metrics to track this return; } - size_t header_size = data->size() - stream_buffer_.Size(); + size_t header_size = data->size() - stream.buffer.Size(); SPDLOG_DEBUG("Fetch header added in rid: {} out rid: {} data sz: {} sbuf_size: {} header size: {}", f_hdr.request_id, *publish_fetch_handler_->GetRequestId(), data->size(), - stream_buffer_.Size(), + stream.buffer.Size(), header_size); f_hdr.request_id = *publish_fetch_handler_->GetRequestId(); @@ -65,12 +57,12 @@ namespace laps { if (header_size < data->size()) { bytes->insert(bytes->end(), data->begin() + header_size, data->end()); - stream_buffer_.Pop(stream_buffer_.Size()); + stream.buffer.Pop(stream.buffer.Size()); } - publish_fetch_handler_->ForwardPublishedData(true, std::move(bytes)); + publish_fetch_handler_->ForwardPublishedData(true, 0, 0, std::move(bytes)); } else { - publish_fetch_handler_->ForwardPublishedData(false, std::move(data)); + publish_fetch_handler_->ForwardPublishedData(false, 0, 0, std::move(data)); } } diff --git a/src/fetch_handler.h b/src/fetch_handler.h index 4ce7a25..190bdf0 100644 --- a/src/fetch_handler.h +++ b/src/fetch_handler.h @@ -17,10 +17,8 @@ namespace laps { const quicr::FullTrackName& full_track_name, quicr::messages::ObjectPriority priority, quicr::messages::GroupOrder group_order, - quicr::messages::GroupId start_group, - quicr::messages::GroupId end_group, - quicr::messages::GroupId start_object, - quicr::messages::GroupId end_object); + const quicr::messages::Location& start_location, + const quicr::messages::FetchEndLocation& end_location); public: static std::shared_ptr Create( @@ -28,19 +26,11 @@ namespace laps { const quicr::FullTrackName& full_track_name, quicr::messages::ObjectPriority priority, quicr::messages::GroupOrder group_order, - quicr::messages::GroupId start_group, - quicr::messages::GroupId end_group, - quicr::messages::GroupId start_object, - quicr::messages::GroupId end_object) + const quicr::messages::Location& start_location, + const quicr::messages::FetchEndLocation& end_location) { - return std::shared_ptr(new FetchTrackHandler(publish_fetch_handler, - full_track_name, - priority, - group_order, - start_group, - end_group, - start_object, - end_object)); + return std::shared_ptr(new FetchTrackHandler( + publish_fetch_handler, full_track_name, priority, group_order, start_location, end_location)); } void StatusChanged(Status status) override; diff --git a/src/peering/peer_manager.cc b/src/peering/peer_manager.cc index 86dae00..f4dd525 100644 --- a/src/peering/peer_manager.cc +++ b/src/peering/peer_manager.cc @@ -441,7 +441,7 @@ namespace laps::peering { entry.stream_id = stream_id; // Update SNS_ID if new stream header included or if datagram (both have sns_id) - if (eflags.new_stream || eflags.use_reliable == false) { + if (is_new_stream || eflags.use_reliable == false) { auto sns_id_bytes = BytesOf(entry.out_sns_id); std::copy(sns_id_bytes.rbegin(), sns_id_bytes.rend(), data_out.begin() + 2); } @@ -494,9 +494,6 @@ namespace laps::peering { case DataType::kNewStream: set_sns_id = true; eflags.use_reliable = true; - eflags.new_stream = true; - eflags.clear_tx_queue = true; - eflags.use_reset = true; break; } diff --git a/src/peering/peer_session.cc b/src/peering/peer_session.cc index 488b177..cd9970e 100644 --- a/src/peering/peer_session.cc +++ b/src/peering/peer_session.cc @@ -500,13 +500,6 @@ namespace laps::peering { data_header.Deserialize(*data); - // Set Any object if new stream - if (data_header.type == DataType::kNewStream) { - eflags.new_stream = true; - eflags.clear_tx_queue = true; - eflags.use_reset = true; - } - // Pipeline forward to other peers. Not all data may have been popped, so only forward popped data manager_.ForwardPeerData( GetSessionId(), true, stream_id.has_value() ? *stream_id : 0, data_header, data, hdr_len, eflags); diff --git a/src/publish_handler.cc b/src/publish_handler.cc index 409b7d5..af979f1 100644 --- a/src/publish_handler.cc +++ b/src/publish_handler.cc @@ -46,7 +46,7 @@ namespace laps { break; case Status::kPaused: reason = "paused"; - pipeline_ = false; + // TODO: Pause should likely clear out all subgroups in flight and start over fresh break; case Status::kSubscriptionUpdated: reason = "subscription updated"; @@ -76,4 +76,17 @@ namespace laps { metrics.quic.tx_queue_discards, metrics.quic.tx_queue_size.avg); } + + bool PublishTrackHandler::SentFirstObject(uint32_t group_id, uint32_t subgroup_id) + { + const auto group_it = stream_info_by_group_.find(group_id); + if (group_it != stream_info_by_group_.end()) { + const auto subgroup_it = group_it->second.find(subgroup_id); + if (subgroup_it != group_it->second.end()) { + return subgroup_it->second.last_object_id.has_value(); + } + } + + return false; + } } \ No newline at end of file diff --git a/src/publish_handler.h b/src/publish_handler.h index b7ed9f7..95d5df4 100644 --- a/src/publish_handler.h +++ b/src/publish_handler.h @@ -22,14 +22,8 @@ namespace laps { void StatusChanged(Status status) override; void MetricsSampled(const quicr::PublishTrackMetrics& metrics) override; - bool pipeline_{ false }; // True indicates using pipeline forwarding, false is object forwarding - - /** - * @brief Check if the first object has been sent or not - * - * @return True if one objet has been sent, False if no objects yet - */ - constexpr bool SentFirstObject() const noexcept { return latest_object_id_.has_value(); } + // note: pipelining starts after the first object + bool SentFirstObject(uint32_t group_id, uint32_t subgroup_id); private: ClientManager& server_; diff --git a/src/subscribe_handler.cc b/src/subscribe_handler.cc index 5e3ef16..fe657d2 100644 --- a/src/subscribe_handler.cc +++ b/src/subscribe_handler.cc @@ -38,13 +38,11 @@ namespace laps { CacheObject object{ object_headers, { data.begin(), data.end() } }; - if (pending_new_group_request_id_.has_value() && current_group_id_ != object_headers.group_id) { + if (pending_new_group_request_id_.has_value() && + (object_headers.group_id == 0 || object_headers.group_id > *pending_new_group_request_id_)) { pending_new_group_request_id_.reset(); } - current_group_id_ = object_headers.group_id; - current_subgroup_id_ = object_headers.subgroup_id; - if (auto group = cache_entry.Get(object_headers.group_id)) { group->insert(std::move(object)); } else { @@ -66,14 +64,14 @@ namespace laps { continue; } - if (sub_info.publish_handlers[self_connection_handle]->pipeline_) { + if (sub_info.publish_handlers[self_connection_handle]->SentFirstObject(object_headers.group_id, + object_headers.subgroup_id)) { continue; } if (object.headers.group_id >= sub_info.start_location.group && object.headers.object_id >= sub_info.start_location.object) { sub_info.publish_handlers[self_connection_handle]->PublishObject(object_headers, data); - sub_info.publish_handlers[self_connection_handle]->pipeline_ = true; } } } catch (const std::exception& e) { @@ -87,25 +85,19 @@ namespace laps { { is_datagram_ = false; - if (stream_id > current_stream_id_) { - current_stream_id_ = stream_id; - } else if (stream_id < current_stream_id_) { - SPDLOG_DEBUG( - "Old stream data received, stream_id: {} is less than {}, ignoring", stream_id, current_stream_id_); - return; - } + auto& stream = streams_[stream_id]; // Process MoQ object from stream data - if (is_start || not stream_buffer_.AnyHasValue()) { - stream_buffer_.Clear(); + if (is_start) { + stream.buffer.Clear(); - stream_buffer_.InitAny(); - stream_buffer_.Push(*data); + stream.buffer.InitAny(); + stream.buffer.Push(*data); // Expect that on initial start of stream, there is enough data to process the stream headers - auto& s_hdr = stream_buffer_.GetAny(); - if (not(stream_buffer_ >> s_hdr)) { + auto& s_hdr = stream.buffer.GetAny(); + if (not(stream.buffer >> s_hdr)) { SPDLOG_ERROR("Not enough data to process new stream headers, stream is invalid"); // TODO: Add metrics to track this return; @@ -131,61 +123,62 @@ namespace laps { } updated_data->insert( - updated_data->end(), data->begin() + (data->size() - stream_buffer_.Size()), data->end()); + updated_data->end(), data->begin() + (data->size() - stream.buffer.Size()), data->end()); - ForwardReceivedData(true, updated_data); + ForwardReceivedData(is_start, stream.current_group_id, stream.current_subgroup_id, updated_data); } else { - ForwardReceivedData(is_start, data); + ForwardReceivedData(is_start, stream.current_group_id, stream.current_subgroup_id, data); } } else if (data) { - ForwardReceivedData(is_start, data); + ForwardReceivedData(is_start, stream.current_group_id, stream.current_subgroup_id, data); // Buffer for cache/full parse - stream_buffer_.Push(*data); + stream.buffer.Push(*data); } - auto& s_hdr = stream_buffer_.GetAny(); + auto& s_hdr = stream.buffer.GetAny(); while (true) { - if (not stream_buffer_.AnyHasValueB()) { - stream_buffer_.InitAnyB(); + if (not stream.buffer.AnyHasValueB()) { + stream.buffer.InitAnyB(); } - auto& obj = stream_buffer_.GetAnyB(); + auto& obj = stream.buffer.GetAnyB(); obj.stream_type = s_hdr.type; const auto subgroup_properties = quicr::messages::StreamHeaderProperties(s_hdr.type); - if (stream_buffer_ >> obj) { + if (stream.buffer >> obj) { subscribe_track_metrics_.objects_received++; - if (next_object_id_.has_value()) { - if (current_group_id_ != s_hdr.group_id || current_subgroup_id_ != s_hdr.subgroup_id) { - next_object_id_ = obj.object_delta; + if (stream.next_object_id.has_value()) { + if (stream.current_group_id != s_hdr.group_id || stream.current_subgroup_id != s_hdr.subgroup_id) { + stream.next_object_id = obj.object_delta; } else { - *next_object_id_ += obj.object_delta; + *stream.next_object_id += obj.object_delta; } } else { - next_object_id_ = obj.object_delta; + stream.next_object_id = obj.object_delta; } - if (pending_new_group_request_id_.has_value() && current_group_id_ != s_hdr.group_id) { + stream.current_group_id = s_hdr.group_id; + stream.current_subgroup_id = s_hdr.subgroup_id.value(); + + if (pending_new_group_request_id_.has_value() && + (s_hdr.group_id == 0 || s_hdr.group_id > *pending_new_group_request_id_)) { pending_new_group_request_id_.reset(); } - current_group_id_ = s_hdr.group_id; - current_subgroup_id_ = s_hdr.subgroup_id.value(); - if (!s_hdr.subgroup_id.has_value()) { if (subgroup_properties.subgroup_id_type != quicr::messages::SubgroupIdType::kSetFromFirstObject) { SPDLOG_ERROR("Bad stream header type when no subgroup ID: {0}", static_cast(s_hdr.type)); return; } - s_hdr.subgroup_id = next_object_id_; + s_hdr.subgroup_id = stream.next_object_id; } ObjectReceived({ s_hdr.group_id, - next_object_id_.value(), + stream.next_object_id.value(), s_hdr.subgroup_id.value(), obj.payload.size(), obj.object_status, @@ -196,8 +189,8 @@ namespace laps { obj.immutable_extensions }, obj.payload); - *next_object_id_ += 1; - stream_buffer_.ResetAnyB(); + *stream.next_object_id += 1; + stream.buffer.ResetAnyB(); } break; // Not complete, wait for more data @@ -208,16 +201,13 @@ namespace laps { { is_datagram_ = true; - // Pipeline forward immediately to subscribers/peers - ForwardReceivedData(false, data); - - // Process MoQ object from stream data - stream_buffer_.Clear(); - - stream_buffer_.Push(*data); + dgram_buffer_.Clear(); + dgram_buffer_.Push(*data); quicr::messages::ObjectDatagram msg; - if (stream_buffer_ >> msg) { + if (dgram_buffer_ >> msg) { + ForwardReceivedData(false, msg.group_id, 0, data); + subscribe_track_metrics_.objects_received++; subscribe_track_metrics_.bytes_received += msg.payload.size(); ObjectReceived( @@ -237,6 +227,8 @@ namespace laps { } void SubscribeTrackHandler::ForwardReceivedData(bool is_new_stream, + uint64_t group_id, + uint64_t subgroup_id, std::shared_ptr> data) { auto self_connection_handle = GetConnectionId(); @@ -299,13 +291,12 @@ namespace laps { } const auto pub_track_h = sub_info.publish_handlers[self_connection_handle]; - if (is_new_stream && pub_track_h->SentFirstObject()) { - pub_track_h->pipeline_ = true; - } else if (not pub_track_h->pipeline_) { + if (not pub_track_h->SentFirstObject(group_id, subgroup_id)) { continue; } - sub_info.publish_handlers[self_connection_handle]->ForwardPublishedData(is_new_stream, data); + sub_info.publish_handlers[self_connection_handle]->ForwardPublishedData( + is_new_stream, group_id, subgroup_id, data); } } @@ -368,5 +359,4 @@ namespace laps { { is_from_peer_ = true; } - } \ No newline at end of file diff --git a/src/subscribe_handler.h b/src/subscribe_handler.h index f395cdf..a2e6801 100644 --- a/src/subscribe_handler.h +++ b/src/subscribe_handler.h @@ -33,7 +33,6 @@ namespace laps { void SetFromPeer(); std::optional GetPendingNewRquestId() { return pending_new_group_request_id_; }; - uint64_t GetLatestGroupId() { return current_group_id_; } struct PublisherLastUpdateInfo { @@ -41,14 +40,14 @@ namespace laps { } pub_last_update_info_; private: - void ForwardReceivedData(bool is_new_stream, std::shared_ptr> data); + void ForwardReceivedData(bool is_new_stream, + uint64_t group_id, + uint64_t subgroup_id, + std::shared_ptr> data); ClientManager& server_; - uint64_t current_stream_id_{ 0 }; - bool is_datagram_{ false }; bool is_from_peer_{ false }; // Indicates that the subscribe handler was created by peer manager for recv data - quicr::StreamBuffer stream_buffer_; }; } // namespace laps