diff --git a/CMakeLists.txt b/CMakeLists.txt index 6b726de..12e86ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,7 +15,7 @@ set (CMAKE_POLICY_VERSION_MINIMUM 3.5) ### Define the project name and version ### Version will be added to version output of apps and library project(laps - VERSION 0.14.41 + VERSION 0.16.0 DESCRIPTION "Latency Aware Publish/Subscribe" LANGUAGES CXX) diff --git a/dependencies/libquicr b/dependencies/libquicr index 25cb5da..8bb3d8d 160000 --- a/dependencies/libquicr +++ b/dependencies/libquicr @@ -1 +1 @@ -Subproject commit 25cb5dacb8125994670ae4cc5620d97978473e44 +Subproject commit 8bb3d8d349823bbb5652271dfed50f8920e8b82e diff --git a/src/client_manager.cc b/src/client_manager.cc index e689a30..ac15229 100644 --- a/src/client_manager.cc +++ b/src/client_manager.cc @@ -35,14 +35,27 @@ namespace laps { std::vector ClientManager::PublishNamespaceDoneReceived( quicr::ConnectionHandle connection_handle, - const quicr::TrackNamespace& track_namespace) + uint64_t request_id) { - auto th = quicr::TrackHash({ track_namespace, {} }); + + auto it = state_.requests.find({ connection_handle, request_id }); + if (it == state_.requests.end()) { + SPDLOG_LOGGER_DEBUG( + LOGGER, + "Received publish namespace done from connection handle: {0} request_id: {1} but request Id not in state", + connection_handle, + request_id); + + return {}; + } SPDLOG_LOGGER_DEBUG(LOGGER, - "Received publish namespace done from connection handle: {0} for namespace hash: {1}", + "Received publish namespace done from connection handle: {0} request_id: {1}", connection_handle, - th.track_namespace_hash); + request_id); + + auto& track_namespace = std::any_cast(it->second.related_data); + auto th = quicr::TrackHash({ track_namespace, {} }); // TODO: Fix O(prefix namespaces) matching std::vector sub_namespace_connections; @@ -61,7 +74,7 @@ namespace laps { } } - ResolvePublishNamespaceDone(connection_handle, track_namespace, sub_namespace_connections); + ResolvePublishNamespaceDone(connection_handle, request_id, sub_namespace_connections); for (auto track_alias : state_.namespace_active[{ track_namespace, connection_handle }]) { auto ptd = state_.pub_subscribes[{ track_alias, connection_handle }]; @@ -135,6 +148,11 @@ namespace laps { const quicr::PublishNamespaceAttributes& attrs) { + auto [req_it, _] = state_.requests.try_emplace({ connection_handle, attrs.request_id }, + State::RequestTransaction::Type::kPublishNamespace, + State::RequestTransaction::State::kOk); + req_it->second.related_data.emplace(track_namespace); + auto subscribe_to_publisher = [&] { auto& anno_tracks = state_.namespace_active[{ track_namespace, connection_handle }]; @@ -249,6 +267,7 @@ namespace laps { attrs.track_alias = publish_attributes.track_alias; attrs.dynamic_groups = publish_attributes.dynamic_groups; attrs.forward = publish_attributes.forward; + attrs.filter_type = publish_attributes.filter_type; /* if (state_.pub_subscribes.contains({ th.track_fullname_hash, connection_handle })) { @@ -265,10 +284,11 @@ namespace laps { return; }*/ - SPDLOG_INFO("Received publish from connection handle: {} using track alias: {} request_id: {}", - connection_handle, - th.track_fullname_hash, - request_id); + SPDLOG_LOGGER_INFO(LOGGER, + "Received publish from connection handle: {} using track alias: {} request_id: {}", + connection_handle, + th.track_fullname_hash, + request_id); quicr::PublishResponse publish_response; publish_response.reason_code = quicr::PublishResponse::ReasonCode::kOk; @@ -319,9 +339,10 @@ namespace laps { } if (not has_subs) { - SPDLOG_INFO("No subscribers, pause publish connection handle: {0} using track alias: {1}", - connection_handle, - th.track_fullname_hash); + SPDLOG_LOGGER_INFO(LOGGER, + "No subscribers, pause publish connection handle: {0} using track alias: {1}", + connection_handle, + th.track_fullname_hash); sub_track_handler->Pause(); } @@ -338,6 +359,7 @@ namespace laps { } void ClientManager::SubscribeNamespaceReceived(quicr::ConnectionHandle connection_handle, + quicr::DataContextId data_ctx_id, const quicr::TrackNamespace& prefix_namespace, const quicr::SubscribeNamespaceAttributes& attributes) { @@ -379,7 +401,8 @@ namespace laps { publish_attributes.track_alias = ta_conn.first; publish_attributes.priority = handler->GetPriority(); // Original priority? publish_attributes.group_order = handler->GetGroupOrder(); - publish_attributes.delivery_timeout = handler->GetDeliveryTimeout(); + publish_attributes.delivery_timeout = + handler->GetDeliveryTimeout().value_or(std::chrono::milliseconds(kDefaultObjectTtl)); publish_attributes.filter_type = handler->GetFilterType(); publish_attributes.forward = true; publish_attributes.new_group_request_id = std::nullopt; @@ -399,10 +422,11 @@ namespace laps { quicr::SubscribeNamespaceResponse::ReasonCode::kOk, .tracks = std::move(matched_tracks), .namespaces = std::move(matched_ns) }; - ResolveSubscribeNamespace(connection_handle, attributes.request_id, prefix_namespace, response); + ResolveSubscribeNamespace(connection_handle, data_ctx_id, attributes.request_id, prefix_namespace, response); } void ClientManager::UnsubscribeNamespaceReceived(quicr::ConnectionHandle connection_handle, + [[maybe_unused]] quicr::DataContextId data_ctx_id, const quicr::TrackNamespace& prefix_namespace) { auto it = state_.subscribes_namespaces.find(prefix_namespace); @@ -646,8 +670,7 @@ namespace laps { void ClientManager::TrackStatusReceived(quicr::ConnectionHandle connection_handle, uint64_t request_id, - const quicr::FullTrackName& track_full_name, - const quicr::messages::SubscribeAttributes& subscribe_attributes) + const quicr::FullTrackName& track_full_name) { auto th = quicr::TrackHash(track_full_name); @@ -670,9 +693,8 @@ namespace laps { if (it->first.second != connection_handle) { ResolveTrackStatus(connection_handle, request_id, - th.track_fullname_hash, { - quicr::SubscribeResponse::ReasonCode::kOk, + quicr::RequestResponse::ReasonCode::kOk, it->second->IsPublisherInitiated(), std::nullopt, largest, @@ -683,9 +705,8 @@ namespace laps { ResolveTrackStatus(connection_handle, request_id, - th.track_fullname_hash, { - quicr::SubscribeResponse::ReasonCode::kTrackDoesNotExist, + quicr::RequestResponse::ReasonCode::kDoesNotExist, false, "Track does not exist", std::nullopt, @@ -704,7 +725,7 @@ namespace laps { ResolveSubscribe(connection_handle, request_id, th.track_fullname_hash, - { .reason_code = quicr::SubscribeResponse::ReasonCode::kNotSupported, + { .reason_code = quicr::RequestResponse::ReasonCode::kNotSupported, .is_publisher_initiated = false, .error_reason = "Duplicate subscribe" }); @@ -734,10 +755,10 @@ namespace laps { connection_handle, request_id, th.track_fullname_hash, - { quicr::SubscribeResponse::ReasonCode::kOk, attrs.is_publisher_initiated, std::nullopt, largest }); + { quicr::RequestResponse::ReasonCode::kOk, attrs.is_publisher_initiated, std::nullopt, largest }); } else { ResolveSubscribe( - connection_handle, request_id, th.track_fullname_hash, { quicr::SubscribeResponse::ReasonCode::kOk }); + connection_handle, request_id, th.track_fullname_hash, { quicr::RequestResponse::ReasonCode::kOk }); } ProcessSubscribe(connection_handle, request_id, th, track_full_name, attrs, largest); @@ -768,7 +789,7 @@ namespace laps { void ClientManager::FetchReceived(quicr::ConnectionHandle connection_handle, uint64_t request_id, const quicr::FullTrackName& track_full_name, - quicr::messages::SubscriberPriority priority, + uint8_t priority, quicr::messages::GroupOrder group_order, quicr::messages::Location start, quicr::messages::FetchEndLocation end) @@ -1012,6 +1033,7 @@ namespace laps { kDefaultPriority, quicr::messages::GroupOrder::kAscending, std::chrono::milliseconds(kDefaultObjectTtl), + std::chrono::milliseconds(0), quicr::messages::FilterType::kLargestObject, 1, true, @@ -1137,17 +1159,13 @@ namespace laps { start_location }); // Always send updates to peers to support subscribe updates and refresh group support - quicr::messages::Subscribe sub( - request_id, - track_full_name.name_space, - track_full_name.name, - attrs.priority, - attrs.group_order, - true, - quicr::messages::FilterType::kLargestObject, // Filters are only for edge to apply - std::nullopt, - std::nullopt, - {}); + auto params = + quicr::messages::Parameters{} + .Add(quicr::messages::ParameterType::kSubscriberPriority, attrs.priority) + .Add(quicr::messages::ParameterType::kGroupOrder, attrs.group_order) + .Add(quicr::messages::ParameterType::kSubscriptionFilter, quicr::messages::FilterType::kLargestObject); + + quicr::messages::Subscribe sub(request_id, track_full_name.name_space, track_full_name.name, params); // TODO: Current new group is not sent by client in subscribe. It's only in subscribe updates. diff --git a/src/client_manager.h b/src/client_manager.h index b047243..3013d4f 100644 --- a/src/client_manager.h +++ b/src/client_manager.h @@ -59,15 +59,17 @@ namespace laps { const ConnectionRemoteInfo& remote) override; void SubscribeNamespaceReceived(quicr::ConnectionHandle connection_handle, + quicr::DataContextId data_ctx_id, const quicr::TrackNamespace& prefix_namespace, const quicr::SubscribeNamespaceAttributes& attributes) override; void UnsubscribeNamespaceReceived(quicr::ConnectionHandle connection_handle, + quicr::DataContextId data_ctx_id, const quicr::TrackNamespace& prefix_namespace) override; std::vector PublishNamespaceDoneReceived( quicr::ConnectionHandle connection_handle, - const quicr::TrackNamespace& track_namespace) override; + quicr::messages::RequestID request_id) override; void PublishNamespaceReceived(quicr::ConnectionHandle connection_handle, const quicr::TrackNamespace& track_namespace, @@ -90,8 +92,7 @@ namespace laps { void TrackStatusReceived(quicr::ConnectionHandle connection_handle, uint64_t request_id, - const quicr::FullTrackName& track_full_name, - const quicr::messages::SubscribeAttributes& subscribe_attributes) override; + const quicr::FullTrackName& track_full_name) override; std::optional GetLargestAvailable(const quicr::FullTrackName& track_name); @@ -132,7 +133,7 @@ namespace laps { void FetchReceived(quicr::ConnectionHandle connection_handle, uint64_t request_id, const quicr::FullTrackName& track_full_name, - quicr::messages::SubscriberPriority priority, + uint8_t priority, quicr::messages::GroupOrder group_order, quicr::messages::Location start, quicr::messages::FetchEndLocation end); diff --git a/src/peering/peer_manager.cc b/src/peering/peer_manager.cc index f4dd525..da6fa14 100644 --- a/src/peering/peer_manager.cc +++ b/src/peering/peer_manager.cc @@ -102,20 +102,11 @@ namespace laps::peering { if (not withdraw) { uint64_t update_ref = rand(); - quicr::messages::Subscribe sub( - [](quicr::messages::Subscribe& msg) { - if (msg.filter_type == quicr::messages::FilterType::kAbsoluteStart || - msg.filter_type == quicr::messages::FilterType::kAbsoluteRange) { - msg.group_0 = std::make_optional(); - } - }, - [](quicr::messages::Subscribe& msg) { - if (msg.filter_type == quicr::messages::FilterType::kAbsoluteRange) { - msg.group_1 = std::make_optional(); - } - }); + quicr::messages::Subscribe sub; subscribe_info.subscribe_data >> sub; + auto priority = sub.parameters.Get(quicr::messages::ParameterType::kSubscriberPriority); + std::lock_guard _(state_.state_mutex); bool announce_matches{ false }; for (auto it = state_.pub_subscribes.lower_bound({ subscribe_info.track_hash.track_fullname_hash, 0 }); @@ -176,10 +167,8 @@ namespace laps::peering { subscribe_info.source_node_id, peer_session->GetSessionId()); - if (auto [sns_id, is_new] = - peer_session->AddSubscribeSourceNode(subscribe_info.track_hash.track_fullname_hash, - subscribe_info.source_node_id, - sub.subscriber_priority); + if (auto [sns_id, is_new] = peer_session->AddSubscribeSourceNode( + subscribe_info.track_hash.track_fullname_hash, subscribe_info.source_node_id, priority); is_new) { SPDLOG_LOGGER_INFO( LOGGER, @@ -269,11 +258,16 @@ namespace laps::peering { sess.second->SendAnnounceInfo(announce_info, withdraw); } + /* + * TODO: Track namespace as request ID works internally between peering and client managers to support + * stateless tracking of namespaces to request IDs. Might need to revisit this + */ if (!announce_info.name.size()) { // PUBLISH_NAMESPACE if (!withdraw) { - client_manager_->PublishNamespaceReceived(0, announce_info.name_space, { .request_id = 0 }); + client_manager_->PublishNamespaceReceived( + 0, announce_info.name_space, { .request_id = th.track_namespace_hash }); } else { - client_manager_->PublishNamespaceDoneReceived(0, announce_info.name_space); + client_manager_->PublishNamespaceDoneReceived(0, th.track_namespace_hash); } } else { // PUBLISH if (!withdraw) { @@ -526,18 +520,7 @@ namespace laps::peering { void PeerManager::ClientUnsubscribe(uint64_t track_fullname_hash) { if (auto si = info_base_->GetSubscribe(track_fullname_hash, node_info_.id)) { - quicr::messages::Subscribe sub( - [](quicr::messages::Subscribe& msg) { - if (msg.filter_type == quicr::messages::FilterType::kAbsoluteStart || - msg.filter_type == quicr::messages::FilterType::kAbsoluteRange) { - msg.group_0 = std::make_optional(); - } - }, - [](quicr::messages::Subscribe& msg) { - if (msg.filter_type == quicr::messages::FilterType::kAbsoluteRange) { - msg.group_1 = std::make_optional(); - } - }); + quicr::messages::Subscribe sub; si->subscribe_data >> sub; info_base_->RemoveSubscribe(*si); @@ -566,18 +549,7 @@ namespace laps::peering { auto tfn = track_full_name; auto th = quicr::TrackHash(tfn); - quicr::messages::Subscribe sub( - [](quicr::messages::Subscribe& msg) { - if (msg.filter_type == quicr::messages::FilterType::kAbsoluteStart || - msg.filter_type == quicr::messages::FilterType::kAbsoluteRange) { - msg.group_0 = std::make_optional(); - } - }, - [](quicr::messages::Subscribe& msg) { - if (msg.filter_type == quicr::messages::FilterType::kAbsoluteRange) { - msg.group_1 = std::make_optional(); - } - }); + quicr::messages::Subscribe sub; // Check for existing subscribe and update if (auto si = info_base_->GetSubscribe(th.track_fullname_hash, node_info_.id)) { @@ -585,6 +557,14 @@ namespace laps::peering { // Update subscription params with new group request si->subscribe_data >> sub; + std::optional new_group_request_id; + if (sub.parameters.Contains(quicr::messages::ParameterType::kNewGroupRequest)) { + new_group_request_id = + sub.parameters.Get(quicr::messages::ParameterType::kNewGroupRequest); + } else { + sub.parameters.Add(quicr::messages::ParameterType::kNewGroupRequest, attrs.new_group_request_id); + } + bool has_new_group_request = false; for (auto it = sub.parameters.begin(); it != sub.parameters.end(); ++it) { if (it->type == quicr::messages::ParameterType::kNewGroupRequest) { @@ -592,7 +572,7 @@ namespace laps::peering { if (!attrs.new_group_request_id.has_value()) { // Remove new group request since it's not requested but was found - sub.parameters.erase(it); + sub.parameters.parameters.erase(it); quicr::Bytes sub_data; sub_data << sub; @@ -605,9 +585,7 @@ namespace laps::peering { } if (attrs.new_group_request_id.has_value() && not has_new_group_request) { - const auto val = BytesOf(attrs.new_group_request_id.value()); - sub.parameters.push_back({ .type = quicr::messages::ParameterType::kNewGroupRequest, - .value = { val.begin(), val.end() } }); + sub.parameters.Add(quicr::messages::ParameterType::kNewGroupRequest, attrs.new_group_request_id); quicr::Bytes sub_data; sub_data << sub; @@ -650,18 +628,7 @@ namespace laps::peering { auto tfn = track_full_name; auto th = quicr::TrackHash(tfn); - quicr::messages::Subscribe sub( - [](quicr::messages::Subscribe& msg) { - if (msg.filter_type == quicr::messages::FilterType::kAbsoluteStart || - msg.filter_type == quicr::messages::FilterType::kAbsoluteRange) { - msg.group_0 = std::make_optional(); - } - }, - [](quicr::messages::Subscribe& msg) { - if (msg.filter_type == quicr::messages::FilterType::kAbsoluteRange) { - msg.group_1 = std::make_optional(); - } - }); + quicr::messages::Subscribe sub; if (subscribe_data.empty()) return; // Empty means it was supposed to be an update which didn't happen @@ -759,18 +726,7 @@ namespace laps::peering { if (si_it.first == node_info_.id) continue; const auto& sub_info = si_it.second; - quicr::messages::Subscribe sub( - [](quicr::messages::Subscribe& msg) { - if (msg.filter_type == quicr::messages::FilterType::kAbsoluteStart || - msg.filter_type == quicr::messages::FilterType::kAbsoluteRange) { - msg.group_0 = std::make_optional(); - } - }, - [](quicr::messages::Subscribe& msg) { - if (msg.filter_type == quicr::messages::FilterType::kAbsoluteRange) { - msg.group_1 = std::make_optional(); - } - }); + quicr::messages::Subscribe sub; try { sub_info.subscribe_data >> sub; @@ -818,10 +774,10 @@ namespace laps::peering { sub_info.source_node_id, peer_session->GetSessionId()); - if (auto [sns_id, is_new] = - peer_session->AddSubscribeSourceNode(sub_info.track_hash.track_fullname_hash, - sub_info.source_node_id, - sub.subscriber_priority); + if (auto [sns_id, is_new] = peer_session->AddSubscribeSourceNode( + sub_info.track_hash.track_fullname_hash, + sub_info.source_node_id, + sub.parameters.Get(quicr::messages::ParameterType::kSubscriberPriority)); is_new) { SPDLOG_LOGGER_INFO( LOGGER, diff --git a/src/state.h b/src/state.h index 3c7b342..ec45bce 100644 --- a/src/state.h +++ b/src/state.h @@ -13,6 +13,34 @@ namespace laps { { std::mutex state_mutex; + /** + * Request Transaction struct to state track the active request + */ + struct RequestTransaction + { + enum class Type : uint8_t + { + kSubscribeNamespace, + kPublishNamespace, + }; + + enum class State : uint8_t + { + kOk, + kPendingOk, + kError, + }; + + Type type; ///< Type of request + State state; ///< State of the request + std::any related_data; ///< Related data based on the type + }; + + /** + * Active requests by connection handle and request ID + */ + std::map, RequestTransaction> requests; + /** * Map of subscribes (e.g., track alias) matched to a publish namespace * diff --git a/src/subscribe_handler.cc b/src/subscribe_handler.cc index fe657d2..5c2688d 100644 --- a/src/subscribe_handler.cc +++ b/src/subscribe_handler.cc @@ -95,7 +95,6 @@ namespace laps { stream.buffer.Push(*data); // Expect that on initial start of stream, there is enough data to process the stream headers - auto& s_hdr = stream.buffer.GetAny(); if (not(stream.buffer >> s_hdr)) { SPDLOG_ERROR("Not enough data to process new stream headers, stream is invalid"); @@ -108,26 +107,15 @@ namespace laps { s_hdr.track_alias = GetTrackAlias().value(); quicr::Bytes updated_s_hdr; - bool zero_subgroup_id{ false }; - const auto properties = quicr::messages::StreamHeaderProperties(s_hdr.type); - if (properties.subgroup_id_type != quicr::messages::SubgroupIdType::kExplicit) { - s_hdr.subgroup_id = std::nullopt; - zero_subgroup_id = true; - } - auto updated_data = std::make_shared>(); *updated_data << s_hdr; - if (zero_subgroup_id) { - s_hdr.subgroup_id = 0; - } - updated_data->insert( updated_data->end(), data->begin() + (data->size() - stream.buffer.Size()), data->end()); - ForwardReceivedData(is_start, stream.current_group_id, stream.current_subgroup_id, updated_data); + ForwardReceivedData(is_start, s_hdr.group_id, s_hdr.subgroup_id.value_or(0), updated_data); } else { - ForwardReceivedData(is_start, stream.current_group_id, stream.current_subgroup_id, data); + ForwardReceivedData(is_start, s_hdr.group_id, s_hdr.subgroup_id.value_or(0), data); } } else if (data) { @@ -145,8 +133,7 @@ namespace laps { } auto& obj = stream.buffer.GetAnyB(); - obj.stream_type = s_hdr.type; - const auto subgroup_properties = quicr::messages::StreamHeaderProperties(s_hdr.type); + obj.properties.emplace(*s_hdr.properties); if (stream.buffer >> obj) { subscribe_track_metrics_.objects_received++; @@ -169,11 +156,10 @@ namespace laps { } if (!s_hdr.subgroup_id.has_value()) { - if (subgroup_properties.subgroup_id_type != quicr::messages::SubgroupIdType::kSetFromFirstObject) { - SPDLOG_ERROR("Bad stream header type when no subgroup ID: {0}", - static_cast(s_hdr.type)); - return; + if (obj.properties->subgroup_id_mode != quicr::messages::SubgroupIdType::kSetFromFirstObject) { + throw quicr::messages::ProtocolViolationException("Subgoup ID mismatch"); } + // Set the subgroup ID from the first object ID. s_hdr.subgroup_id = stream.next_object_id; } @@ -251,7 +237,7 @@ namespace laps { if (is_new_stream) { d_type = peering::DataType::kNewStream; - if (GetDeliveryTimeout().count() == 0) { + if (GetDeliveryTimeout().value_or(std::chrono::milliseconds(kDefaultObjectTtl)).count() == 0) { // Use default if delivery timeout is not set SetDeliveryTimeout(std::chrono::milliseconds(server_.config_.object_ttl_)); } @@ -259,8 +245,11 @@ namespace laps { } if (not is_from_peer_) { - server_.peer_manager_.ClientDataRecv( - *track_alias, GetPriority(), GetDeliveryTimeout().count(), d_type, data); + server_.peer_manager_.ClientDataRecv(*track_alias, + GetPriority(), + GetDeliveryTimeout().value_or(std::chrono::milliseconds(0)).count(), + d_type, + data); } // Fanout object to subscribers