Skip to content

Commit 6171c01

Browse files
support for default recovery option without last sample miss detection
1 parent b216284 commit 6171c01

File tree

3 files changed

+50
-21
lines changed

3 files changed

+50
-21
lines changed

examples/zenohc/z_advanced_sub.cxx

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,12 @@ int _main(int argc, char **argv) {
4848
ext::SessionExt::AdvancedSubscriberOptions opts;
4949
opts.history.emplace().detect_late_publishers = true;
5050
opts.history->detect_late_publishers = true;
51-
opts.recovery.emplace(); // enable recovery based on received heartbeats from ext::AdvancedPublisher
51+
// enable recovery based on received heartbeats from ext::AdvancedPublisher
52+
opts.recovery.emplace().last_sample_miss_detection =
53+
ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions::Heartbeat{};
5254
// alternatively recovery can be triggered based on missed sample detection via periodic queries:
53-
// opts.recovery.emplace().periodic_queries_period_ms = 1000;
55+
// opts.recovery.emplace().last_sample_miss_detection =
56+
// ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions::PeriodicQueriesOptions{1000};
5457
opts.subscriber_detection = true;
5558

5659
auto data_handler = [](const Sample &sample) {

include/zenoh/api/ext/session_ext.hxx

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -286,12 +286,14 @@ class SessionExt {
286286
/// @brief Create default option settings.
287287
static CacheOptions create_default() { return {}; }
288288
};
289-
// @brief Settings allowing matching Subscribers to detect lost samples and optionally ask for retransimission.
289+
290+
/// @brief Settings allowing matching Subscribers to detect lost samples and optionally ask for retransimission.
290291
struct SampleMissDetectionOptions {
291-
// The period of publisher heartbeats in ms, which can be used by ``AdvancedSubscriber`` for missed sample
292-
// detection (if heartbeat-based recovery is enabled). If this value is unset, the subscribers will only be
293-
// notified about missed samples if they opt to send periodic queries.
292+
/// The period of publisher heartbeats in ms, which can be used by ``AdvancedSubscriber`` for missed sample
293+
/// detection (if heartbeat-based recovery is enabled).
294+
/// Otherwise, missed samples will be retransmitted based on Advanced Subscriber queries.
294295
std::optional<uint64_t> heartbeat_period_ms = {};
296+
295297
/// @brief Create default option settings.
296298
static SampleMissDetectionOptions create_default() { return {}; }
297299
};
@@ -392,14 +394,32 @@ class SessionExt {
392394
struct RecoveryOptions {
393395
/// @name Fields
394396

395-
/// @brief Period for queries for not yet received Samples.
396-
///
397-
/// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost.
398-
/// So it is useful for sporadic publications but useless for periodic publications
399-
/// with a period smaller or equal to this period.
400-
/// If unset, the subscriber will be instead notified about missed samples through ``AdvancedPublisher``
401-
/// heartbeats (if enabled on publisher side).
402-
std::optional<uint64_t> periodic_queries_period_ms = {};
397+
/// @brief Option tag for Heartbeat-based last sample detection.
398+
struct Heartbeat {};
399+
400+
/// @brief Settings for periodic queries-based last sample detection.
401+
struct PeriodicQueriesOptions {
402+
/// @name Fields
403+
404+
/// @brief Period for queries for not yet received Samples.
405+
///
406+
/// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost.
407+
/// So it is useful for sporadic publications but useless for periodic publications
408+
/// with a period smaller or equal to this period.
409+
uint64_t period_ms = 1000;
410+
411+
/// @name Methods
412+
413+
/// @brief Create default option settings.
414+
static PeriodicQueriesOptions create_default() { return {}; };
415+
};
416+
417+
/// @brief Setting for detecting last sample(s) miss.
418+
/// Note that it does not affect intermediate sample miss detection/retrieval (which is performed
419+
/// automatically as long as recovery is enabled). If this option is disabled, subscriber will be unable to
420+
/// detect/request retransmission of missed sample until it receives a more recent one from the same
421+
/// publisher.
422+
std::optional<std::variant<Heartbeat, PeriodicQueriesOptions>> last_sample_miss_detection = {};
403423

404424
/// @name Methods
405425

@@ -443,12 +463,18 @@ class SessionExt {
443463
}
444464
if (this->recovery.has_value()) {
445465
opts.recovery.is_enabled = true;
446-
if (this->recovery->periodic_queries_period_ms.has_value()) {
447-
// treat 0 as very small delay
448-
opts.recovery.periodic_queries_period_ms =
449-
std::max<uint64_t>(1, this->recovery->periodic_queries_period_ms.value());
450-
} else {
451-
opts.recovery.periodic_queries_period_ms = 0;
466+
if (this->recovery->last_sample_miss_detection.has_value()) {
467+
opts.recovery.last_sample_miss_detection.is_enabled = true;
468+
if (std::holds_alternative<RecoveryOptions::Heartbeat>(
469+
this->recovery->last_sample_miss_detection.value())) {
470+
opts.recovery.last_sample_miss_detection.periodic_queries_period_ms = 0;
471+
} else {
472+
// treat 0 as very small delay
473+
opts.recovery.last_sample_miss_detection.periodic_queries_period_ms =
474+
std::max<uint64_t>(1, std::get<RecoveryOptions::PeriodicQueriesOptions>(
475+
this->recovery->last_sample_miss_detection.value())
476+
.period_ms);
477+
}
452478
}
453479
}
454480
opts.query_timeout_ms = this->query_timeout_ms;

0 commit comments

Comments
 (0)