Skip to content

Commit

Permalink
support for default recovery option without last sample miss detection (
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 authored Jan 16, 2025
1 parent b216284 commit bbfef04
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 21 deletions.
7 changes: 5 additions & 2 deletions examples/zenohc/z_advanced_sub.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ int _main(int argc, char **argv) {
ext::SessionExt::AdvancedSubscriberOptions opts;
opts.history.emplace().detect_late_publishers = true;
opts.history->detect_late_publishers = true;
opts.recovery.emplace(); // enable recovery based on received heartbeats from ext::AdvancedPublisher
// enable recovery based on received heartbeats from ext::AdvancedPublisher
opts.recovery.emplace().last_sample_miss_detection =
ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions::Heartbeat{};
// alternatively recovery can be triggered based on missed sample detection via periodic queries:
// opts.recovery.emplace().periodic_queries_period_ms = 1000;
// opts.recovery.emplace().last_sample_miss_detection =
// ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions::PeriodicQueriesOptions{1000};
opts.subscriber_detection = true;

auto data_handler = [](const Sample &sample) {
Expand Down
62 changes: 44 additions & 18 deletions include/zenoh/api/ext/session_ext.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,14 @@ class SessionExt {
/// @brief Create default option settings.
static CacheOptions create_default() { return {}; }
};
// @brief Settings allowing matching Subscribers to detect lost samples and optionally ask for retransimission.

/// @brief Settings allowing matching Subscribers to detect lost samples and optionally ask for retransimission.
struct SampleMissDetectionOptions {
// The period of publisher heartbeats in ms, which can be used by ``AdvancedSubscriber`` for missed sample
// detection (if heartbeat-based recovery is enabled). If this value is unset, the subscribers will only be
// notified about missed samples if they opt to send periodic queries.
/// The period of publisher heartbeats in ms, which can be used by ``AdvancedSubscriber`` for missed sample
/// detection (if heartbeat-based recovery is enabled).
/// Otherwise, missed samples will be retransmitted based on Advanced Subscriber queries.
std::optional<uint64_t> heartbeat_period_ms = {};

/// @brief Create default option settings.
static SampleMissDetectionOptions create_default() { return {}; }
};
Expand Down Expand Up @@ -392,14 +394,32 @@ class SessionExt {
struct RecoveryOptions {
/// @name Fields

/// @brief Period for queries for not yet received Samples.
///
/// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost.
/// So it is useful for sporadic publications but useless for periodic publications
/// with a period smaller or equal to this period.
/// If unset, the subscriber will be instead notified about missed samples through ``AdvancedPublisher``
/// heartbeats (if enabled on publisher side).
std::optional<uint64_t> periodic_queries_period_ms = {};
/// @brief Option tag for Heartbeat-based last sample detection.
struct Heartbeat {};

/// @brief Settings for periodic queries-based last sample detection.
struct PeriodicQueriesOptions {
/// @name Fields

/// @brief Period for queries for not yet received Samples.
///
/// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost.
/// So it is useful for sporadic publications but useless for periodic publications
/// with a period smaller or equal to this period.
uint64_t period_ms = 1000;

/// @name Methods

/// @brief Create default option settings.
static PeriodicQueriesOptions create_default() { return {}; };
};

/// @brief Setting for detecting last sample(s) miss.
/// Note that it does not affect intermediate sample miss detection/retrieval (which is performed
/// automatically as long as recovery is enabled). If this option is disabled, subscriber will be unable to
/// detect/request retransmission of missed sample until it receives a more recent one from the same
/// publisher.
std::optional<std::variant<Heartbeat, PeriodicQueriesOptions>> last_sample_miss_detection = {};

/// @name Methods

Expand Down Expand Up @@ -443,12 +463,18 @@ class SessionExt {
}
if (this->recovery.has_value()) {
opts.recovery.is_enabled = true;
if (this->recovery->periodic_queries_period_ms.has_value()) {
// treat 0 as very small delay
opts.recovery.periodic_queries_period_ms =
std::max<uint64_t>(1, this->recovery->periodic_queries_period_ms.value());
} else {
opts.recovery.periodic_queries_period_ms = 0;
if (this->recovery->last_sample_miss_detection.has_value()) {
opts.recovery.last_sample_miss_detection.is_enabled = true;
if (std::holds_alternative<RecoveryOptions::Heartbeat>(
this->recovery->last_sample_miss_detection.value())) {
opts.recovery.last_sample_miss_detection.periodic_queries_period_ms = 0;
} else {
// treat 0 as very small delay
opts.recovery.last_sample_miss_detection.periodic_queries_period_ms =
std::max<uint64_t>(1, std::get<RecoveryOptions::PeriodicQueriesOptions>(
this->recovery->last_sample_miss_detection.value())
.period_ms);
}
}
}
opts.query_timeout_ms = this->query_timeout_ms;
Expand Down

0 comments on commit bbfef04

Please sign in to comment.