Skip to content
Merged
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ set (CMAKE_POLICY_VERSION_MINIMUM 3.5)
### Define the project name and version
### Version will be added to version output of apps and library
project(laps
VERSION 0.14.40
VERSION 0.14.41
DESCRIPTION "Latency Aware Publish/Subscribe"
LANGUAGES CXX)

Expand Down
2 changes: 1 addition & 1 deletion dependencies/libquicr
Submodule libquicr updated 39 files
+1 −1 Makefile
+14 −21 README.md
+26 −6 c-bridge/include/quicr/quicr_bridge.h
+80 −36 c-bridge/src/quicr_bridge.cpp
+8 −21 cmd/examples/CMakeLists.txt
+346 −90 cmd/examples/client.cpp
+0 −1,084 cmd/examples/server.cpp
+6 −23 include/quicr/client.h
+12 −4 include/quicr/detail/attributes.h
+60 −49 include/quicr/detail/messages.h
+23 −28 include/quicr/detail/quic_transport.h
+156 −0 include/quicr/detail/safe_time_queue.h
+23 −17 include/quicr/detail/transport.h
+24 −48 include/quicr/fetch_track_handler.h
+7 −7 include/quicr/object.h
+1 −0 include/quicr/publish_fetch_handler.h
+34 −20 include/quicr/publish_track_handler.h
+115 −0 include/quicr/subscribe_namespace_handler.h
+37 −5 include/quicr/subscribe_track_handler.h
+1 −0 src/CMakeLists.txt
+22 −25 src/client.cpp
+16 −13 src/fetch_track_handler.cpp
+13 −11 src/joining_fetch_handler.cpp
+12 −4 src/messages.cpp
+8 −5 src/publish_fetch_handler.cpp
+148 −47 src/publish_track_handler.cpp
+14 −9 src/server.cpp
+91 −0 src/subscribe_namespace_handler.cpp
+78 −65 src/subscribe_track_handler.cpp
+79 −40 src/transport.cpp
+374 −423 src/transport_picoquic.cpp
+90 −57 src/transport_picoquic.h
+513 −25 test/integration_test/integration_test.cpp
+0 −11 test/integration_test/test_client.cpp
+0 −11 test/integration_test/test_client.h
+103 −8 test/integration_test/test_server.cpp
+122 −3 test/integration_test/test_server.h
+117 −6 test/moq_data_messages.cpp
+147 −0 test/track_handlers.cpp
49 changes: 25 additions & 24 deletions src/client_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ namespace laps {
quicr::messages::SubscriberPriority priority,
quicr::messages::GroupOrder group_order,
quicr::messages::Location start,
std::optional<quicr::messages::Location> end)
quicr::messages::FetchEndLocation end)
{
auto reason_code = quicr::FetchResponse::ReasonCode::kOk;
std::optional<quicr::messages::Location> largest_location = std::nullopt;
Expand All @@ -791,12 +791,11 @@ namespace laps {
reason_code = quicr::FetchResponse::ReasonCode::kNoObjects;
}

if (start.group > end->group || largest_location.value().group < start.group) {
if (start.group > end.group || largest_location.value().group < start.group) {
reason_code = quicr::FetchResponse::ReasonCode::kInvalidRange;
}

const auto& cache_entries =
cache_entry_it->second.Get(start.group, end->group != 0 ? end->group : cache_entry_it->second.Size());
const auto& cache_entries = cache_entry_it->second.Get(start.group, end.group);

if (cache_entries.empty()) {
reason_code = quicr::FetchResponse::ReasonCode::kNoObjects;
Expand All @@ -809,19 +808,15 @@ namespace laps {

stop_fetch_.try_emplace({ connection_handle, request_id }, false);

if (!end.has_value()) {
end = { 0, 0 };
}

SPDLOG_LOGGER_DEBUG(LOGGER,
"Fetch received conn_id: {} request_id: {} range start group: {} start object: {} end "
"group: {} end object: {} largest_location: {}",
connection_handle,
request_id,
start.group,
start.object,
end->group,
end->object,
end.group,
end.object.value_or(0),
largest_location.has_value() ? largest_location.value().group : 0);

std::thread retrieve_cache_thread([=, cache_entries = std::move(cache_entries), this] {
Expand All @@ -835,10 +830,8 @@ namespace laps {
track_full_name,
priority,
group_order,
start.group,
end->group,
start.object,
end->object);
{ .group = start.group, .object = start.object },
{ .group = end.group, .object = end.object });

quicr::ConnectionHandle pub_connection_handle = 0;

Expand Down Expand Up @@ -933,12 +926,19 @@ namespace laps {
return;
}

if (end->object && object.headers.group_id == end->group &&
object.headers.object_id >= end->object) {
// Start at start object id
if (start.group == object.headers.group_id && start.object > object.headers.object_id) {
continue;
}

// Stop at end object, unless end object is zero
if (end.object.has_value() && object.headers.group_id == end.group &&
object.headers.object_id > *end.object) {
return;
}

SPDLOG_TRACE("Fetching group: {} object: {}", object.headers.group_id, object.headers.object_id);
SPDLOG_LOGGER_TRACE(
LOGGER, "Fetching group: {} object: {}", object.headers.group_id, object.headers.object_id);

try {
pub_fetch_h->PublishObject(object.headers, object.data);
Expand Down Expand Up @@ -972,11 +972,12 @@ namespace laps {
const quicr::messages::JoiningFetchAttributes& attributes)
{
uint64_t joining_start = 0;
std::optional<quicr::messages::Location> largest_location = GetLargestAvailable(track_full_name);

if (attributes.relative) {
if (const auto largest = GetLargestAvailable(track_full_name)) {
if (largest->group > attributes.joining_start)
joining_start = largest->group - attributes.joining_start;
if (largest_location.has_value()) {
if (largest_location->group > attributes.joining_start)
joining_start = largest_location->group - attributes.joining_start;
}
} else {
joining_start = attributes.joining_start;
Expand All @@ -988,7 +989,7 @@ namespace laps {
attributes.priority,
attributes.group_order,
{ joining_start, 0 },
std::nullopt);
{ largest_location->group, std::nullopt });
}

void ClientManager::FetchCancelReceived(quicr::ConnectionHandle connection_handle, uint64_t request_id)
Expand Down Expand Up @@ -1027,9 +1028,9 @@ namespace laps {
break;
}

if (!(it->second->GetPendingNewRquestId().has_value() && *it->second->GetPendingNewRquestId() == 0 &&
group_id == 0) &&
(group_id == 0 || it->second->GetLatestGroupId() < group_id)) {
if (!it->second->GetPendingNewRquestId().has_value() ||
(group_id == 0 && *it->second->GetPendingNewRquestId()) ||
*it->second->GetPendingNewRquestId() < group_id) {

it->second->SetNewGroupRequestId(group_id);
DampenOrUpdateTrackSubscription(it->second, true);
Expand Down
2 changes: 1 addition & 1 deletion src/client_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ namespace laps {
quicr::messages::SubscriberPriority priority,
quicr::messages::GroupOrder group_order,
quicr::messages::Location start,
std::optional<quicr::messages::Location> end);
quicr::messages::FetchEndLocation end);

State& state_;
const Config& config_;
Expand Down
38 changes: 15 additions & 23 deletions src/fetch_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,9 @@ namespace laps {
const quicr::FullTrackName& full_track_name,
quicr::messages::ObjectPriority priority,
quicr::messages::GroupOrder group_order,
quicr::messages::GroupId start_group,
quicr::messages::GroupId end_group,
quicr::messages::GroupId start_object,
quicr::messages::GroupId end_object)
: quicr::FetchTrackHandler(full_track_name,
priority,
group_order,
start_group,
end_group,
start_object,
end_object)
const quicr::messages::Location& start_location,
const quicr::messages::FetchEndLocation& end_location)
: quicr::FetchTrackHandler(full_track_name, priority, group_order, start_location, end_location)
, publish_fetch_handler_(std::move(publish_fetch_handler))
{
}
Expand All @@ -31,32 +23,32 @@ namespace laps {
std::shared_ptr<const std::vector<uint8_t>> data)
{
subscribe_track_metrics_.bytes_received += data->size();
auto& stream = streams_[stream_id];

if (is_start) {
// Process the fetch header and update it so that it works for the requestor
stream_buffer_.Clear();
stream.buffer.Clear();

stream_buffer_.InitAny<quicr::messages::FetchHeader>();
stream_buffer_.Push(*data);
stream.buffer.InitAny<quicr::messages::FetchHeader>();
stream.buffer.Push(*data);

// Expect that on initial start of stream, there is enough data to process the stream headers

auto f_hdr = stream_buffer_.GetAny<quicr::messages::FetchHeader>();
if (not(stream_buffer_ >> f_hdr)) {
auto f_hdr = stream.buffer.GetAny<quicr::messages::FetchHeader>();
if (not(stream.buffer >> f_hdr)) {
SPDLOG_ERROR("Not enough data to process new stream headers, stream is invalid len: {} / {}",
stream_buffer_.Size(),
stream.buffer.Size(),
data->size());
// TODO: Add metrics to track this
return;
}

size_t header_size = data->size() - stream_buffer_.Size();
size_t header_size = data->size() - stream.buffer.Size();

SPDLOG_DEBUG("Fetch header added in rid: {} out rid: {} data sz: {} sbuf_size: {} header size: {}",
f_hdr.request_id,
*publish_fetch_handler_->GetRequestId(),
data->size(),
stream_buffer_.Size(),
stream.buffer.Size(),
header_size);

f_hdr.request_id = *publish_fetch_handler_->GetRequestId();
Expand All @@ -65,12 +57,12 @@ namespace laps {

if (header_size < data->size()) {
bytes->insert(bytes->end(), data->begin() + header_size, data->end());
stream_buffer_.Pop(stream_buffer_.Size());
stream.buffer.Pop(stream.buffer.Size());
}

publish_fetch_handler_->ForwardPublishedData(true, std::move(bytes));
publish_fetch_handler_->ForwardPublishedData(true, 0, 0, std::move(bytes));
} else {
publish_fetch_handler_->ForwardPublishedData(false, std::move(data));
publish_fetch_handler_->ForwardPublishedData(false, 0, 0, std::move(data));
}
}

Expand Down
22 changes: 6 additions & 16 deletions src/fetch_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,20 @@ namespace laps {
const quicr::FullTrackName& full_track_name,
quicr::messages::ObjectPriority priority,
quicr::messages::GroupOrder group_order,
quicr::messages::GroupId start_group,
quicr::messages::GroupId end_group,
quicr::messages::GroupId start_object,
quicr::messages::GroupId end_object);
const quicr::messages::Location& start_location,
const quicr::messages::FetchEndLocation& end_location);

public:
static std::shared_ptr<FetchTrackHandler> Create(
const std::shared_ptr<quicr::PublishFetchHandler> publish_fetch_handler,
const quicr::FullTrackName& full_track_name,
quicr::messages::ObjectPriority priority,
quicr::messages::GroupOrder group_order,
quicr::messages::GroupId start_group,
quicr::messages::GroupId end_group,
quicr::messages::GroupId start_object,
quicr::messages::GroupId end_object)
const quicr::messages::Location& start_location,
const quicr::messages::FetchEndLocation& end_location)
{
return std::shared_ptr<FetchTrackHandler>(new FetchTrackHandler(publish_fetch_handler,
full_track_name,
priority,
group_order,
start_group,
end_group,
start_object,
end_object));
return std::shared_ptr<FetchTrackHandler>(new FetchTrackHandler(
publish_fetch_handler, full_track_name, priority, group_order, start_location, end_location));
}

void StatusChanged(Status status) override;
Expand Down
5 changes: 1 addition & 4 deletions src/peering/peer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ namespace laps::peering {
entry.stream_id = stream_id;

// Update SNS_ID if new stream header included or if datagram (both have sns_id)
if (eflags.new_stream || eflags.use_reliable == false) {
if (is_new_stream || eflags.use_reliable == false) {
auto sns_id_bytes = BytesOf(entry.out_sns_id);
std::copy(sns_id_bytes.rbegin(), sns_id_bytes.rend(), data_out.begin() + 2);
}
Expand Down Expand Up @@ -494,9 +494,6 @@ namespace laps::peering {
case DataType::kNewStream:
set_sns_id = true;
eflags.use_reliable = true;
eflags.new_stream = true;
eflags.clear_tx_queue = true;
eflags.use_reset = true;
break;
}

Expand Down
7 changes: 0 additions & 7 deletions src/peering/peer_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,6 @@ namespace laps::peering {

data_header.Deserialize(*data);

// Set Any object if new stream
if (data_header.type == DataType::kNewStream) {
eflags.new_stream = true;
eflags.clear_tx_queue = true;
eflags.use_reset = true;
}

// Pipeline forward to other peers. Not all data may have been popped, so only forward popped data
manager_.ForwardPeerData(
GetSessionId(), true, stream_id.has_value() ? *stream_id : 0, data_header, data, hdr_len, eflags);
Expand Down
15 changes: 14 additions & 1 deletion src/publish_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace laps {
break;
case Status::kPaused:
reason = "paused";
pipeline_ = false;
// TODO: Pause should likely clear out all subgroups in flight and start over fresh
break;
case Status::kSubscriptionUpdated:
reason = "subscription updated";
Expand Down Expand Up @@ -76,4 +76,17 @@ namespace laps {
metrics.quic.tx_queue_discards,
metrics.quic.tx_queue_size.avg);
}

bool PublishTrackHandler::SentFirstObject(uint32_t group_id, uint32_t subgroup_id)
{
const auto group_it = stream_info_by_group_.find(group_id);
if (group_it != stream_info_by_group_.end()) {
const auto subgroup_it = group_it->second.find(subgroup_id);
if (subgroup_it != group_it->second.end()) {
return subgroup_it->second.last_object_id.has_value();
}
}

return false;
}
}
10 changes: 2 additions & 8 deletions src/publish_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,8 @@ namespace laps {
void StatusChanged(Status status) override;
void MetricsSampled(const quicr::PublishTrackMetrics& metrics) override;

bool pipeline_{ false }; // True indicates using pipeline forwarding, false is object forwarding

/**
* @brief Check if the first object has been sent or not
*
* @return True if one objet has been sent, False if no objects yet
*/
constexpr bool SentFirstObject() const noexcept { return latest_object_id_.has_value(); }
// note: pipelining starts after the first object
bool SentFirstObject(uint32_t group_id, uint32_t subgroup_id);

private:
ClientManager& server_;
Expand Down
Loading
Loading