Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions src/client_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,8 @@ namespace laps {
const quicr::FullTrackName& track_full_name,
quicr::messages::SubscriberPriority priority,
quicr::messages::GroupOrder group_order,
quicr::messages::Location start,
std::optional<quicr::messages::Location> end)
const quicr::messages::Location& start,
const quicr::messages::FetchEndLocation& end)
{
auto reason_code = quicr::FetchResponse::ReasonCode::kOk;
std::optional<quicr::messages::Location> largest_location = std::nullopt;
Expand All @@ -791,12 +791,11 @@ namespace laps {
reason_code = quicr::FetchResponse::ReasonCode::kNoObjects;
}

if (start.group > end->group || largest_location.value().group < start.group) {
if (start.group > end.group || largest_location.value().group < start.group) {
reason_code = quicr::FetchResponse::ReasonCode::kInvalidRange;
}

const auto& cache_entries =
cache_entry_it->second.Get(start.group, end->group != 0 ? end->group : cache_entry_it->second.Size());
const auto& cache_entries = cache_entry_it->second.Get(start.group, end.group);

if (cache_entries.empty()) {
reason_code = quicr::FetchResponse::ReasonCode::kNoObjects;
Expand All @@ -809,19 +808,15 @@ namespace laps {

stop_fetch_.try_emplace({ connection_handle, request_id }, false);

if (!end.has_value()) {
end = { 0, 0 };
}

SPDLOG_LOGGER_DEBUG(LOGGER,
"Fetch received conn_id: {} request_id: {} range start group: {} start object: {} end "
"group: {} end object: {} largest_location: {}",
connection_handle,
request_id,
start.group,
start.object,
end->group,
end->object,
end.group,
end.object.has_value() ? std::to_string(*end.object) : "to_end",
largest_location.has_value() ? largest_location.value().group : 0);

std::thread retrieve_cache_thread([=, cache_entries = std::move(cache_entries), this] {
Expand All @@ -831,14 +826,8 @@ namespace laps {

if (rc != quicr::FetchResponse::ReasonCode::kOk) {
// Try to see if original publisher can provide the data
auto track_handler = FetchTrackHandler::Create(pub_fetch_h,
track_full_name,
priority,
group_order,
start.group,
end->group,
start.object,
end->object);
auto track_handler =
FetchTrackHandler::Create(pub_fetch_h, track_full_name, priority, group_order, start, end);

quicr::ConnectionHandle pub_connection_handle = 0;

Expand Down Expand Up @@ -933,8 +922,15 @@ namespace laps {
return;
}

if (end->object && object.headers.group_id == end->group &&
object.headers.object_id >= end->object) {
// When intra-group, skip any objects prior to start.
if (start.group == end.group && object.headers.group_id == start.group &&
object.headers.object_id < start.object) {
continue;
}

// Are we done? (end location is inclusive)
if (end.object.has_value() && object.headers.group_id == end.group &&
object.headers.object_id > *end.object) {
return;
}

Expand Down Expand Up @@ -972,23 +968,29 @@ namespace laps {
const quicr::messages::JoiningFetchAttributes& attributes)
{
uint64_t joining_start = 0;
const auto largest = GetLargestAvailable(track_full_name);

if (attributes.relative) {
if (const auto largest = GetLargestAvailable(track_full_name)) {
if (largest.has_value()) {
if (largest->group > attributes.joining_start)
joining_start = largest->group - attributes.joining_start;
}
} else {
joining_start = attributes.joining_start;
}

// For joining fetch, fetch from joining_start to the largest available group
const quicr::messages::FetchEndLocation end_location = {
largest.has_value() ? largest->group : joining_start, std::nullopt
};

FetchReceived(connection_handle,
request_id,
track_full_name,
attributes.priority,
attributes.group_order,
{ joining_start, 0 },
std::nullopt);
end_location);
}

void ClientManager::FetchCancelReceived(quicr::ConnectionHandle connection_handle, uint64_t request_id)
Expand Down
4 changes: 2 additions & 2 deletions src/client_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ namespace laps {
const quicr::FullTrackName& track_full_name,
quicr::messages::SubscriberPriority priority,
quicr::messages::GroupOrder group_order,
quicr::messages::Location start,
std::optional<quicr::messages::Location> end);
const quicr::messages::Location& start,
const quicr::messages::FetchEndLocation& end);

State& state_;
const Config& config_;
Expand Down
14 changes: 3 additions & 11 deletions src/fetch_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,9 @@ namespace laps {
const quicr::FullTrackName& full_track_name,
quicr::messages::ObjectPriority priority,
quicr::messages::GroupOrder group_order,
quicr::messages::GroupId start_group,
quicr::messages::GroupId end_group,
quicr::messages::GroupId start_object,
quicr::messages::GroupId end_object)
: quicr::FetchTrackHandler(full_track_name,
priority,
group_order,
start_group,
end_group,
start_object,
end_object)
const quicr::messages::Location& start_location,
const quicr::messages::FetchEndLocation& end_location)
: quicr::FetchTrackHandler(full_track_name, priority, group_order, start_location, end_location)
, publish_fetch_handler_(std::move(publish_fetch_handler))
{
}
Expand Down
18 changes: 6 additions & 12 deletions src/fetch_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,24 @@ namespace laps {
const quicr::FullTrackName& full_track_name,
quicr::messages::ObjectPriority priority,
quicr::messages::GroupOrder group_order,
quicr::messages::GroupId start_group,
quicr::messages::GroupId end_group,
quicr::messages::GroupId start_object,
quicr::messages::GroupId end_object);
const quicr::messages::Location& start_location,
const quicr::messages::FetchEndLocation& end_location);

public:
static std::shared_ptr<FetchTrackHandler> Create(
const std::shared_ptr<quicr::PublishFetchHandler> publish_fetch_handler,
const quicr::FullTrackName& full_track_name,
quicr::messages::ObjectPriority priority,
quicr::messages::GroupOrder group_order,
quicr::messages::GroupId start_group,
quicr::messages::GroupId end_group,
quicr::messages::GroupId start_object,
quicr::messages::GroupId end_object)
const quicr::messages::Location& start_location,
const quicr::messages::FetchEndLocation& end_location)
{
return std::shared_ptr<FetchTrackHandler>(new FetchTrackHandler(publish_fetch_handler,
full_track_name,
priority,
group_order,
start_group,
end_group,
start_object,
end_object));
start_location,
end_location));
}

void StatusChanged(Status status) override;
Expand Down
Loading