Skip to content

Commit

Permalink
Don't delete existing subscription if failed to create a new one (#1923
Browse files Browse the repository at this point in the history
…) (#1930)

Signed-off-by: Michael Orlov <morlovmr@gmail.com>
(cherry picked from commit 94969bd)

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
  • Loading branch information
mergify[bot] and MichaelOrlov authored Mar 4, 2025
1 parent a5818fc commit f61c186
Showing 1 changed file with 15 additions and 16 deletions.
31 changes: 15 additions & 16 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,21 +490,20 @@ void RecorderImpl::topics_discovery()
}
while (rclcpp::ok() && discovery_running_) {
try {
auto topics_to_subscribe = get_requested_or_available_topics();
for (const auto & topic_and_type : topics_to_subscribe) {
warn_if_new_qos_for_subscribed_topic(topic_and_type.first);
}
auto missing_topics = get_missing_topics(topics_to_subscribe);
subscribe_topics(missing_topics);

if (!record_options_.topics.empty() &&
subscriptions_.size() == record_options_.topics.size())
{
RCLCPP_INFO(
node->get_logger(),
"All requested topics are subscribed. Stopping discovery...");
node->get_logger(), "All requested topics are subscribed. Stopping discovery...");
return;
}

auto topics_to_subscribe = get_requested_or_available_topics();
for (const auto & topic_and_type : topics_to_subscribe) {
warn_if_new_qos_for_subscribed_topic(topic_and_type.first);
}
auto missing_topics = get_missing_topics(topics_to_subscribe);
subscribe_topics(missing_topics);
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(node->get_logger(), "Failure in topics discovery.\nError: " << e.what());
} catch (...) {
Expand All @@ -525,9 +524,9 @@ std::unordered_map<std::string, std::string>
RecorderImpl::get_missing_topics(const std::unordered_map<std::string, std::string> & all_topics)
{
std::unordered_map<std::string, std::string> missing_topics;
for (const auto & i : all_topics) {
if (subscriptions_.find(i.first) == subscriptions_.end()) {
missing_topics.emplace(i.first, i.second);
for (const auto & [topic_name, topic_type] : all_topics) {
if (subscriptions_.find(topic_name) == subscriptions_.end()) {
missing_topics.emplace(topic_name, topic_type);
}
}
return missing_topics;
Expand All @@ -553,6 +552,9 @@ void RecorderImpl::subscribe_topics(

void RecorderImpl::subscribe_topic(const rosbag2_storage::TopicMetadata & topic)
{
if (subscriptions_.find(topic.name) != subscriptions_.end()) {
return;
}
// Need to create topic in writer before we are trying to create subscription. Since in
// callback for subscription we are calling writer_->write(bag_message); and it could happened
// that callback called before we reached out the line: writer_->create_topic(topic)
Expand All @@ -563,12 +565,9 @@ void RecorderImpl::subscribe_topic(const rosbag2_storage::TopicMetadata & topic)
auto subscription = create_subscription(topic.name, topic.type, subscription_qos);
if (subscription) {
subscriptions_.insert({topic.name, subscription});
RCLCPP_INFO_STREAM(
node->get_logger(),
"Subscribed to topic '" << topic.name << "'");
RCLCPP_INFO_STREAM(node->get_logger(), "Subscribed to topic '" << topic.name << "'");
} else {
writer_->remove_topic(topic);
subscriptions_.erase(topic.name);
}
}

Expand Down

0 comments on commit f61c186

Please sign in to comment.