Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ set (CMAKE_POLICY_VERSION_MINIMUM 3.5)
### Define the project name and version
### Version will be added to version output of apps and library
project(laps
VERSION 0.14.41
VERSION 0.16.0
DESCRIPTION "Latency Aware Publish/Subscribe"
LANGUAGES CXX)

Expand Down
2 changes: 1 addition & 1 deletion dependencies/libquicr
Submodule libquicr updated 50 files
+75 −28 .github/workflows/cmake.yml
+1 −1 CMakeLists.txt
+0 −1 c-bridge/examples/chat.c
+0 −1 c-bridge/examples/file_transfer.c
+0 −1 c-bridge/examples/simple_publisher.c
+9 −7 c-bridge/include/quicr/quicr_bridge.h
+62 −29 c-bridge/src/quicr_bridge.cpp
+136 −49 cmd/examples/client.cpp
+16 −13 cmd/examples/helper_functions.h
+33 −50 include/quicr/client.h
+52 −7 include/quicr/common.h
+1 −0 include/quicr/detail/attributes.h
+18 −4 include/quicr/detail/base_track_handler.h
+355 −61 include/quicr/detail/ctrl_message_types.h
+2 −11 include/quicr/detail/data_storage.h
+152 −354 include/quicr/detail/messages.h
+4 −4 include/quicr/detail/quic_transport.h
+179 −98 include/quicr/detail/transport.h
+2 −2 include/quicr/fetch_track_handler.h
+135 −0 include/quicr/publish_namespace_handler.h
+14 −19 include/quicr/publish_track_handler.h
+19 −19 include/quicr/server.h
+18 −2 include/quicr/subscribe_namespace_handler.h
+15 −7 include/quicr/subscribe_track_handler.h
+2 −1 src/CMakeLists.txt
+40 −0 src/base_track_handler.cpp
+273 −517 src/client.cpp
+56 −32 src/ctrl_message_types.cpp
+114 −59 src/messages.cpp
+94 −0 src/publish_namespace_handler.cpp
+23 −8 src/publish_track_handler.cpp
+218 −503 src/server.cpp
+6 −5 src/subscribe_namespace_handler.cpp
+60 −37 src/subscribe_track_handler.cpp
+515 −622 src/transport.cpp
+22 −29 src/transport_picoquic.cpp
+0 −1 src/transport_picoquic.h
+76 −56 test/integration_test/integration_test.cpp
+2 −2 test/integration_test/test_client.cpp
+3 −3 test/integration_test/test_client.h
+9 −6 test/integration_test/test_server.cpp
+8 −5 test/integration_test/test_server.h
+206 −606 test/moq_ctrl_messages.cpp
+196 −143 test/moq_data_messages.cpp
+8 −4 test/track_handlers.cpp
+1 −1 tools/draft_parser/README.md
+6,104 −0 tools/draft_parser/drafts/draft-ietf-moq-transport-16.txt
+6,113 −0 tools/draft_parser/drafts/draft-ietf-moq-transport-16_edited.txt
+8 −9 tools/draft_parser/main.py
+2 −2 tools/draft_parser/moqt_parser/message_spec.py
88 changes: 53 additions & 35 deletions src/client_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,27 @@ namespace laps {

std::vector<quicr::ConnectionHandle> ClientManager::PublishNamespaceDoneReceived(
quicr::ConnectionHandle connection_handle,
const quicr::TrackNamespace& track_namespace)
uint64_t request_id)
{
auto th = quicr::TrackHash({ track_namespace, {} });

auto it = state_.requests.find({ connection_handle, request_id });
if (it == state_.requests.end()) {
SPDLOG_LOGGER_DEBUG(
LOGGER,
"Received publish namespace done from connection handle: {0} request_id: {1} but request Id not in state",
connection_handle,
request_id);

return {};
}

SPDLOG_LOGGER_DEBUG(LOGGER,
"Received publish namespace done from connection handle: {0} for namespace hash: {1}",
"Received publish namespace done from connection handle: {0} request_id: {1}",
connection_handle,
th.track_namespace_hash);
request_id);

auto& track_namespace = std::any_cast<quicr::TrackNamespace&>(it->second.related_data);
auto th = quicr::TrackHash({ track_namespace, {} });

// TODO: Fix O(prefix namespaces) matching
std::vector<quicr::ConnectionHandle> sub_namespace_connections;
Expand All @@ -61,7 +74,7 @@ namespace laps {
}
}

ResolvePublishNamespaceDone(connection_handle, track_namespace, sub_namespace_connections);
ResolvePublishNamespaceDone(connection_handle, request_id, sub_namespace_connections);

for (auto track_alias : state_.namespace_active[{ track_namespace, connection_handle }]) {
auto ptd = state_.pub_subscribes[{ track_alias, connection_handle }];
Expand Down Expand Up @@ -135,6 +148,11 @@ namespace laps {
const quicr::PublishNamespaceAttributes& attrs)
{

auto [req_it, _] = state_.requests.try_emplace({ connection_handle, attrs.request_id },
State::RequestTransaction::Type::kPublishNamespace,
State::RequestTransaction::State::kOk);
req_it->second.related_data.emplace<quicr::TrackNamespace>(track_namespace);

auto subscribe_to_publisher = [&] {
auto& anno_tracks = state_.namespace_active[{ track_namespace, connection_handle }];

Expand Down Expand Up @@ -249,6 +267,7 @@ namespace laps {
attrs.track_alias = publish_attributes.track_alias;
attrs.dynamic_groups = publish_attributes.dynamic_groups;
attrs.forward = publish_attributes.forward;
attrs.filter_type = publish_attributes.filter_type;

/*
if (state_.pub_subscribes.contains({ th.track_fullname_hash, connection_handle })) {
Expand All @@ -265,10 +284,11 @@ namespace laps {
return;
}*/

SPDLOG_INFO("Received publish from connection handle: {} using track alias: {} request_id: {}",
connection_handle,
th.track_fullname_hash,
request_id);
SPDLOG_LOGGER_INFO(LOGGER,
"Received publish from connection handle: {} using track alias: {} request_id: {}",
connection_handle,
th.track_fullname_hash,
request_id);

quicr::PublishResponse publish_response;
publish_response.reason_code = quicr::PublishResponse::ReasonCode::kOk;
Expand Down Expand Up @@ -319,9 +339,10 @@ namespace laps {
}

if (not has_subs) {
SPDLOG_INFO("No subscribers, pause publish connection handle: {0} using track alias: {1}",
connection_handle,
th.track_fullname_hash);
SPDLOG_LOGGER_INFO(LOGGER,
"No subscribers, pause publish connection handle: {0} using track alias: {1}",
connection_handle,
th.track_fullname_hash);

sub_track_handler->Pause();
}
Expand All @@ -338,6 +359,7 @@ namespace laps {
}

void ClientManager::SubscribeNamespaceReceived(quicr::ConnectionHandle connection_handle,
quicr::DataContextId data_ctx_id,
const quicr::TrackNamespace& prefix_namespace,
const quicr::SubscribeNamespaceAttributes& attributes)
{
Expand Down Expand Up @@ -379,7 +401,8 @@ namespace laps {
publish_attributes.track_alias = ta_conn.first;
publish_attributes.priority = handler->GetPriority(); // Original priority?
publish_attributes.group_order = handler->GetGroupOrder();
publish_attributes.delivery_timeout = handler->GetDeliveryTimeout();
publish_attributes.delivery_timeout =
handler->GetDeliveryTimeout().value_or(std::chrono::milliseconds(kDefaultObjectTtl));
publish_attributes.filter_type = handler->GetFilterType();
publish_attributes.forward = true;
publish_attributes.new_group_request_id = std::nullopt;
Expand All @@ -399,10 +422,11 @@ namespace laps {
quicr::SubscribeNamespaceResponse::ReasonCode::kOk,
.tracks = std::move(matched_tracks),
.namespaces = std::move(matched_ns) };
ResolveSubscribeNamespace(connection_handle, attributes.request_id, prefix_namespace, response);
ResolveSubscribeNamespace(connection_handle, data_ctx_id, attributes.request_id, prefix_namespace, response);
}

void ClientManager::UnsubscribeNamespaceReceived(quicr::ConnectionHandle connection_handle,
[[maybe_unused]] quicr::DataContextId data_ctx_id,
const quicr::TrackNamespace& prefix_namespace)
{
auto it = state_.subscribes_namespaces.find(prefix_namespace);
Expand Down Expand Up @@ -646,8 +670,7 @@ namespace laps {

void ClientManager::TrackStatusReceived(quicr::ConnectionHandle connection_handle,
uint64_t request_id,
const quicr::FullTrackName& track_full_name,
const quicr::messages::SubscribeAttributes& subscribe_attributes)
const quicr::FullTrackName& track_full_name)
{
auto th = quicr::TrackHash(track_full_name);

Expand All @@ -670,9 +693,8 @@ namespace laps {
if (it->first.second != connection_handle) {
ResolveTrackStatus(connection_handle,
request_id,
th.track_fullname_hash,
{
quicr::SubscribeResponse::ReasonCode::kOk,
quicr::RequestResponse::ReasonCode::kOk,
it->second->IsPublisherInitiated(),
std::nullopt,
largest,
Expand All @@ -683,9 +705,8 @@ namespace laps {

ResolveTrackStatus(connection_handle,
request_id,
th.track_fullname_hash,
{
quicr::SubscribeResponse::ReasonCode::kTrackDoesNotExist,
quicr::RequestResponse::ReasonCode::kDoesNotExist,
false,
"Track does not exist",
std::nullopt,
Expand All @@ -704,7 +725,7 @@ namespace laps {
ResolveSubscribe(connection_handle,
request_id,
th.track_fullname_hash,
{ .reason_code = quicr::SubscribeResponse::ReasonCode::kNotSupported,
{ .reason_code = quicr::RequestResponse::ReasonCode::kNotSupported,
.is_publisher_initiated = false,
.error_reason = "Duplicate subscribe" });

Expand Down Expand Up @@ -734,10 +755,10 @@ namespace laps {
connection_handle,
request_id,
th.track_fullname_hash,
{ quicr::SubscribeResponse::ReasonCode::kOk, attrs.is_publisher_initiated, std::nullopt, largest });
{ quicr::RequestResponse::ReasonCode::kOk, attrs.is_publisher_initiated, std::nullopt, largest });
} else {
ResolveSubscribe(
connection_handle, request_id, th.track_fullname_hash, { quicr::SubscribeResponse::ReasonCode::kOk });
connection_handle, request_id, th.track_fullname_hash, { quicr::RequestResponse::ReasonCode::kOk });
}

ProcessSubscribe(connection_handle, request_id, th, track_full_name, attrs, largest);
Expand Down Expand Up @@ -768,7 +789,7 @@ namespace laps {
void ClientManager::FetchReceived(quicr::ConnectionHandle connection_handle,
uint64_t request_id,
const quicr::FullTrackName& track_full_name,
quicr::messages::SubscriberPriority priority,
uint8_t priority,
quicr::messages::GroupOrder group_order,
quicr::messages::Location start,
quicr::messages::FetchEndLocation end)
Expand Down Expand Up @@ -1012,6 +1033,7 @@ namespace laps {
kDefaultPriority,
quicr::messages::GroupOrder::kAscending,
std::chrono::milliseconds(kDefaultObjectTtl),
std::chrono::milliseconds(0),
quicr::messages::FilterType::kLargestObject,
1,
true,
Expand Down Expand Up @@ -1137,17 +1159,13 @@ namespace laps {
start_location });

// Always send updates to peers to support subscribe updates and refresh group support
quicr::messages::Subscribe sub(
request_id,
track_full_name.name_space,
track_full_name.name,
attrs.priority,
attrs.group_order,
true,
quicr::messages::FilterType::kLargestObject, // Filters are only for edge to apply
std::nullopt,
std::nullopt,
{});
auto params =
quicr::messages::Parameters{}
.Add(quicr::messages::ParameterType::kSubscriberPriority, attrs.priority)
.Add(quicr::messages::ParameterType::kGroupOrder, attrs.group_order)
.Add(quicr::messages::ParameterType::kSubscriptionFilter, quicr::messages::FilterType::kLargestObject);

quicr::messages::Subscribe sub(request_id, track_full_name.name_space, track_full_name.name, params);

// TODO: Current new group is not sent by client in subscribe. It's only in subscribe updates.

Expand Down
9 changes: 5 additions & 4 deletions src/client_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,17 @@ namespace laps {
const ConnectionRemoteInfo& remote) override;

void SubscribeNamespaceReceived(quicr::ConnectionHandle connection_handle,
quicr::DataContextId data_ctx_id,
const quicr::TrackNamespace& prefix_namespace,
const quicr::SubscribeNamespaceAttributes& attributes) override;

void UnsubscribeNamespaceReceived(quicr::ConnectionHandle connection_handle,
quicr::DataContextId data_ctx_id,
const quicr::TrackNamespace& prefix_namespace) override;

std::vector<quicr::ConnectionHandle> PublishNamespaceDoneReceived(
quicr::ConnectionHandle connection_handle,
const quicr::TrackNamespace& track_namespace) override;
quicr::messages::RequestID request_id) override;

void PublishNamespaceReceived(quicr::ConnectionHandle connection_handle,
const quicr::TrackNamespace& track_namespace,
Expand All @@ -90,8 +92,7 @@ namespace laps {

void TrackStatusReceived(quicr::ConnectionHandle connection_handle,
uint64_t request_id,
const quicr::FullTrackName& track_full_name,
const quicr::messages::SubscribeAttributes& subscribe_attributes) override;
const quicr::FullTrackName& track_full_name) override;

std::optional<quicr::messages::Location> GetLargestAvailable(const quicr::FullTrackName& track_name);

Expand Down Expand Up @@ -132,7 +133,7 @@ namespace laps {
void FetchReceived(quicr::ConnectionHandle connection_handle,
uint64_t request_id,
const quicr::FullTrackName& track_full_name,
quicr::messages::SubscriberPriority priority,
uint8_t priority,
quicr::messages::GroupOrder group_order,
quicr::messages::Location start,
quicr::messages::FetchEndLocation end);
Expand Down
Loading
Loading