diff --git a/nimbro_topic_transport/CMakeLists.txt b/nimbro_topic_transport/CMakeLists.txt index a856dec..4a8459e 100644 --- a/nimbro_topic_transport/CMakeLists.txt +++ b/nimbro_topic_transport/CMakeLists.txt @@ -7,6 +7,8 @@ find_package(catkin REQUIRED COMPONENTS topic_tools rostest message_generation + std_msgs + std_srvs ) add_message_files(FILES diff --git a/nimbro_topic_transport/doc/configuration.md b/nimbro_topic_transport/doc/configuration.md index baaad79..929ded7 100644 --- a/nimbro_topic_transport/doc/configuration.md +++ b/nimbro_topic_transport/doc/configuration.md @@ -67,3 +67,6 @@ parameter is `name`. - `resend`: If the sender does not get a message 1.0/`rate` after the last one, it will re-send the last received one. (UDP only) - `compress`: If true, compress the data on the wire with bz2. + - `latch` (bool): This topic should be latched. If you want it to work correctly + (even when the receiver is started later than the sender), you have to launch the + helper nodes (look in the example launch files). diff --git a/nimbro_topic_transport/launch/tcp_receiver.launch b/nimbro_topic_transport/launch/tcp_receiver.launch index a6cce50..01f3ccb 100644 --- a/nimbro_topic_transport/launch/tcp_receiver.launch +++ b/nimbro_topic_transport/launch/tcp_receiver.launch @@ -1,4 +1,9 @@ + + + + + @@ -8,4 +13,10 @@ + + + + + + diff --git a/nimbro_topic_transport/launch/tcp_receiver_latch_helper.launch b/nimbro_topic_transport/launch/tcp_receiver_latch_helper.launch new file mode 100644 index 0000000..b10a3b6 --- /dev/null +++ b/nimbro_topic_transport/launch/tcp_receiver_latch_helper.launch @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + +services: + - name: "$(arg service_server_name)/publish_latched_topics" + type: "std_srvs/Empty" + + + + \ No newline at end of file diff --git a/nimbro_topic_transport/launch/tcp_sender.launch b/nimbro_topic_transport/launch/tcp_sender.launch index a09fb14..2d8e388 100644 --- a/nimbro_topic_transport/launch/tcp_sender.launch +++ b/nimbro_topic_transport/launch/tcp_sender.launch @@ -1,5 +1,7 @@ + + @@ -11,4 +13,8 @@ + + + + diff --git a/nimbro_topic_transport/launch/tcp_sender_latch_helper.launch b/nimbro_topic_transport/launch/tcp_sender_latch_helper.launch new file mode 100644 index 0000000..c8f9858 --- /dev/null +++ b/nimbro_topic_transport/launch/tcp_sender_latch_helper.launch @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/nimbro_topic_transport/launch/topics.yaml b/nimbro_topic_transport/launch/topics.yaml index 1cdefce..090176e 100644 --- a/nimbro_topic_transport/launch/topics.yaml +++ b/nimbro_topic_transport/launch/topics.yaml @@ -3,3 +3,4 @@ topics: compress: true # enable bz2 compression rate: 1.0 # rate limit at 1Hz - name: "/my_second_topic" + latch: True diff --git a/nimbro_topic_transport/package.xml b/nimbro_topic_transport/package.xml index f16e39b..b5d8738 100644 --- a/nimbro_topic_transport/package.xml +++ b/nimbro_topic_transport/package.xml @@ -9,10 +9,14 @@ catkin roscpp + rospy topic_tools message_generation + std_msgs + std_srvs + plot_msgs diff --git a/nimbro_topic_transport/src/tcp/tcp_packet.h b/nimbro_topic_transport/src/tcp/tcp_packet.h index b77be62..851fae5 100644 --- a/nimbro_topic_transport/src/tcp/tcp_packet.h +++ b/nimbro_topic_transport/src/tcp/tcp_packet.h @@ -11,7 +11,8 @@ namespace nimbro_topic_transport enum TCPFlag { - TCP_FLAG_COMPRESSED = (1 << 0) + TCP_FLAG_COMPRESSED = (1 << 0), + TCP_FLAG_LATCHED = (1 << 1) }; struct TCPHeader diff --git a/nimbro_topic_transport/src/tcp/tcp_receiver.cpp b/nimbro_topic_transport/src/tcp/tcp_receiver.cpp index 135eabb..71692de 100644 --- a/nimbro_topic_transport/src/tcp/tcp_receiver.cpp +++ b/nimbro_topic_transport/src/tcp/tcp_receiver.cpp @@ -10,6 +10,7 @@ #include "tcp_packet.h" #include "../topic_info.h" #include +#include #include @@ -106,6 +107,8 @@ TCPReceiver::TCPReceiver() ); m_nh.param("topic_prefix", m_topicPrefix, std::string()); + + m_pub_ready = m_nh.advertise("/network/receiver_ready", 1, true); } TCPReceiver::~TCPReceiver() @@ -116,6 +119,10 @@ void TCPReceiver::run() { fd_set fds; + std_msgs::String readyMessage; + readyMessage.data = ""; + m_pub_ready.publish(readyMessage); + while(ros::ok()) { ros::spinOnce(); @@ -345,6 +352,10 @@ void TCPReceiver::ClientHandler::run() topic_info::getMsgDef(type) ); + if (header.flags() & TCP_FLAG_LATCHED) { + options.latch = true; + } + // It will take subscribers some time to connect to our publisher. // Therefore, latch messages so they will not be lost. // No, this is often unexpected. Instead, wait before publishing. diff --git a/nimbro_topic_transport/src/tcp/tcp_receiver.h b/nimbro_topic_transport/src/tcp/tcp_receiver.h index 6582146..8642ea6 100644 --- a/nimbro_topic_transport/src/tcp/tcp_receiver.h +++ b/nimbro_topic_transport/src/tcp/tcp_receiver.h @@ -65,6 +65,7 @@ class TCPReceiver ros::WallTimer m_statsTimer; std::string m_topicPrefix; + ros::Publisher m_pub_ready; }; } diff --git a/nimbro_topic_transport/src/tcp/tcp_receiver_latch_helper.py b/nimbro_topic_transport/src/tcp/tcp_receiver_latch_helper.py new file mode 100755 index 0000000..fef9fe3 --- /dev/null +++ b/nimbro_topic_transport/src/tcp/tcp_receiver_latch_helper.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python + +__author__ = "Martin Pecka " + +import rospy +from std_msgs.msg import String +from std_srvs.srv import Empty, EmptyRequest + + +class LatchHelper(object): + """Helper class that calls the publish_latched_messages service of a remote sender whenever a receiver gets ready.""" + + def __init__(self): + + # name of the remote service to be called + self._service_server_name = rospy.get_param("~service_server_name") + + self._ready_subscriber = rospy.Subscriber("/network/receiver_ready", + String, self.receiver_ready_cb, queue_size=1) + + self._service_server = rospy.ServiceProxy(self._service_server_name + "/publish_latched_messages", Empty) + + def receiver_ready_cb(self, receiver_name_msg): + try: + self._service_server.wait_for_service(timeout=0.1) + self._service_server(EmptyRequest()) + except rospy.ROSException: # It means the waiting timed out + # So we have nobody to contact and we can omit calling the service + pass + + +if __name__ == '__main__': + rospy.init_node("tcp_receiver_latch_helper") + helper = LatchHelper() + rospy.spin() diff --git a/nimbro_topic_transport/src/tcp/tcp_sender.cpp b/nimbro_topic_transport/src/tcp/tcp_sender.cpp index 13f8cc0..656c5da 100644 --- a/nimbro_topic_transport/src/tcp/tcp_sender.cpp +++ b/nimbro_topic_transport/src/tcp/tcp_sender.cpp @@ -77,6 +77,9 @@ TCPSender::TCPSender() if(entry.hasMember("compress") && ((bool)entry["compress"]) == true) flags |= TCP_FLAG_COMPRESSED; + if(entry.hasMember("latch") && ((bool)entry["latch"]) == true) + flags |= TCP_FLAG_LATCHED; + boost::function func; func = boost::bind(&TCPSender::send, this, topic, flags, _1); @@ -138,7 +141,10 @@ TCPSender::TCPSender() boost::bind(&TCPSender::updateStats, this) ); m_statsTimer.start(); -} + + m_latchedMessageRequestServer = m_nh.advertiseService( + "publish_latched_messages", &TCPSender::sendLatched, this); +} TCPSender::~TCPSender() { @@ -250,6 +256,9 @@ void TCPSender::send(const std::string& topic, int flags, const topic_tools::Sha if(flags & TCP_FLAG_COMPRESSED) maxDataSize = size + size / 100 + 1200; // taken from bzip2 docs + if (flags & TCP_FLAG_LATCHED) + this->m_latchedMessages[topic] = std::make_pair(shifter, flags); + m_packet.resize( sizeof(TCPHeader) + topic.length() + type.length() + maxDataSize ); @@ -363,6 +372,20 @@ void TCPSender::updateStats() m_sentBytesInStatsInterval = 0; } +bool TCPSender::sendLatched(std_srvs::Empty::Request& request, std_srvs::Empty::Response& response) { + + std::map>::iterator it = + this->m_latchedMessages.begin(); + + // send all latched messages + while (it != this->m_latchedMessages.end()) { + this->send(it->first, it->second.second, it->second.first); + it++; + } + + return true; +} + } diff --git a/nimbro_topic_transport/src/tcp/tcp_sender.h b/nimbro_topic_transport/src/tcp/tcp_sender.h index ebedf5b..bd4e3ce 100644 --- a/nimbro_topic_transport/src/tcp/tcp_sender.h +++ b/nimbro_topic_transport/src/tcp/tcp_sender.h @@ -14,6 +14,10 @@ #include #endif +#include + +#include + #include namespace nimbro_topic_transport @@ -28,6 +32,7 @@ class TCPSender bool connect(); void send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter); + bool sendLatched(std_srvs::Empty::Request& request, std_srvs::Empty::Response& response); private: void updateStats(); @@ -43,6 +48,8 @@ class TCPSender std::vector m_packet; std::vector m_compressionBuf; + std::map > m_latchedMessages; + #if WITH_CONFIG_SERVER std::map>> m_enableTopic; #endif @@ -53,6 +60,7 @@ class TCPSender ros::WallTimer m_statsTimer; uint64_t m_sentBytesInStatsInterval; std::map m_topicSendBytesInStatsInteral; + ros::ServiceServer m_latchedMessageRequestServer; }; }