From 70de7e8de9f4e9c2a7ac0f9098493366a9000259 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Mon, 22 Jul 2024 22:27:31 +0900 Subject: [PATCH] perf: move mt_queue from before decoder to after it, improving performance --- .../nebula_ros/hesai/decoder_wrapper.hpp | 35 +++++++-- .../nebula_ros/hesai/hesai_ros_wrapper.hpp | 6 -- nebula_ros/src/hesai/decoder_wrapper.cpp | 75 ++++++++++++------- nebula_ros/src/hesai/hesai_ros_wrapper.cpp | 20 ++--- 4 files changed, 82 insertions(+), 54 deletions(-) diff --git a/nebula_ros/include/nebula_ros/hesai/decoder_wrapper.hpp b/nebula_ros/include/nebula_ros/hesai/decoder_wrapper.hpp index 396f16d23..f74506710 100644 --- a/nebula_ros/include/nebula_ros/hesai/decoder_wrapper.hpp +++ b/nebula_ros/include/nebula_ros/hesai/decoder_wrapper.hpp @@ -16,14 +16,18 @@ #include "nebula_decoders/nebula_decoders_hesai/hesai_driver.hpp" #include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp" +#include "nebula_ros/common/mt_queue.hpp" #include "nebula_ros/common/parameter_descriptors.hpp" #include "nebula_ros/common/watchdog_timer.hpp" #include #include +#include #include +#include "nebula_msgs/msg/detail/nebula_packets__struct.hpp" #include "nebula_msgs/msg/nebula_packet.hpp" +#include "nebula_msgs/msg/nebula_packets.hpp" #include "pandar_msgs/msg/pandar_packet.hpp" #include "pandar_msgs/msg/pandar_scan.hpp" @@ -31,6 +35,9 @@ #include #include #include +#include +#include +#include #include namespace nebula @@ -63,6 +70,13 @@ class HesaiDecoderWrapper nebula::Status Status(); private: + struct PublishData + { + nebula_msgs::msg::NebulaPackets::UniquePtr packets; + drivers::NebulaPointCloudPtr cloud; + double cloud_timestamp_s; + }; + /// @brief Load calibration data from the best available source: /// 1. If sensor connected, download and save from sensor /// 2. If downloaded file available, load that file @@ -74,6 +88,8 @@ class HesaiDecoderWrapper get_calibration_result_t GetCalibrationData( const std::string & calibration_file_path, bool ignore_others = false); + void publish(PublishData && data); + void PublishCloud( std::unique_ptr pointcloud, const rclcpp::Publisher::SharedPtr & publisher); @@ -93,18 +109,21 @@ class HesaiDecoderWrapper const std::shared_ptr hw_interface_; std::shared_ptr sensor_cfg_; - std::string calibration_file_path_{}; - std::shared_ptr calibration_cfg_ptr_{}; + std::string calibration_file_path_; + std::shared_ptr calibration_cfg_ptr_; - std::shared_ptr driver_ptr_{}; + std::shared_ptr driver_ptr_; std::mutex mtx_driver_ptr_; - rclcpp::Publisher::SharedPtr packets_pub_{}; - pandar_msgs::msg::PandarScan::UniquePtr current_scan_msg_{}; + mt_queue publish_queue_; + std::thread pub_thread_; + + rclcpp::Publisher::SharedPtr packets_pub_; + nebula_msgs::msg::NebulaPackets::UniquePtr current_scan_msg_; - rclcpp::Publisher::SharedPtr nebula_points_pub_{}; - rclcpp::Publisher::SharedPtr aw_points_ex_pub_{}; - rclcpp::Publisher::SharedPtr aw_points_base_pub_{}; + rclcpp::Publisher::SharedPtr nebula_points_pub_; + rclcpp::Publisher::SharedPtr aw_points_ex_pub_; + rclcpp::Publisher::SharedPtr aw_points_base_pub_; std::shared_ptr cloud_watchdog_; }; diff --git a/nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp b/nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp index 8a70aa0e3..36e70d70a 100644 --- a/nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp +++ b/nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp @@ -18,7 +18,6 @@ #include "nebula_common/hesai/hesai_common.hpp" #include "nebula_common/nebula_common.hpp" #include "nebula_common/nebula_status.hpp" -#include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp" #include "nebula_ros/common/mt_queue.hpp" #include "nebula_ros/common/parameter_descriptors.hpp" #include "nebula_ros/hesai/decoder_wrapper.hpp" @@ -83,11 +82,6 @@ class HesaiRosWrapper final : public rclcpp::Node std::shared_ptr sensor_cfg_ptr_{}; - /// @brief Stores received packets that have not been processed yet by the decoder thread - mt_queue> packet_queue_; - /// @brief Thread to isolate decoding from receiving - std::thread decoder_thread_; - rclcpp::Subscription::SharedPtr packets_sub_{}; bool launch_hw_; diff --git a/nebula_ros/src/hesai/decoder_wrapper.cpp b/nebula_ros/src/hesai/decoder_wrapper.cpp index b13d9d782..d1e5aa6f5 100644 --- a/nebula_ros/src/hesai/decoder_wrapper.cpp +++ b/nebula_ros/src/hesai/decoder_wrapper.cpp @@ -2,6 +2,16 @@ #include "nebula_ros/hesai/decoder_wrapper.hpp" +#include + +#include +#include + +#include +#include +#include +#include + #pragma clang diagnostic ignored "-Wbitwise-instead-of-logical" namespace nebula { @@ -17,7 +27,9 @@ HesaiDecoderWrapper::HesaiDecoderWrapper( : status_(nebula::Status::NOT_INITIALIZED), logger_(parent_node->get_logger().get_child("HesaiDecoder")), hw_interface_(hw_interface), - sensor_cfg_(config) + sensor_cfg_(config), + publish_queue_(1), + current_scan_msg_(std::make_unique()) { if (!config) { throw std::runtime_error("HesaiDecoderWrapper cannot be instantiated without a valid config!"); @@ -54,14 +66,12 @@ HesaiDecoderWrapper::HesaiDecoderWrapper( // Publish packets only if HW interface is connected if (hw_interface_) { - current_scan_msg_ = std::make_unique(); packets_pub_ = parent_node->create_publisher( "pandar_packets", rclcpp::SensorDataQoS()); } auto qos_profile = rmw_qos_profile_sensor_data; - auto pointcloud_qos = - rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 10), qos_profile); + auto pointcloud_qos = rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 1), qos_profile); nebula_points_pub_ = parent_node->create_publisher("pandar_points", pointcloud_qos); @@ -78,6 +88,13 @@ HesaiDecoderWrapper::HesaiDecoderWrapper( RCLCPP_WARN_THROTTLE( logger_, *parent_node->get_clock(), 5000, "Missed pointcloud output deadline"); }); + + pub_thread_ = std::thread([&]() { + while (true) { + auto publish_data = publish_queue_.pop(); + publish(std::move(publish_data)); + } + }); } void HesaiDecoderWrapper::OnConfigChange( @@ -222,25 +239,20 @@ void HesaiDecoderWrapper::ProcessCloudPacket( std::unique_ptr packet_msg) { // Accumulate packets for recording only if someone is subscribed to the topic (for performance) - if ( - hw_interface_ && (packets_pub_->get_subscription_count() > 0 || - packets_pub_->get_intra_process_subscription_count() > 0)) { + if (hw_interface_) { if (current_scan_msg_->packets.size() == 0) { current_scan_msg_->header.stamp = packet_msg->stamp; + current_scan_msg_->header.frame_id = sensor_cfg_->frame_id; } - pandar_msgs::msg::PandarPacket pandar_packet_msg{}; - pandar_packet_msg.stamp = packet_msg->stamp; - pandar_packet_msg.size = packet_msg->data.size(); - std::copy(packet_msg->data.begin(), packet_msg->data.end(), pandar_packet_msg.data.begin()); - current_scan_msg_->packets.emplace_back(std::move(pandar_packet_msg)); + current_scan_msg_->packets.emplace_back(std::move(*packet_msg)); } std::tuple pointcloud_ts{}; nebula::drivers::NebulaPointCloudPtr pointcloud = nullptr; { std::lock_guard lock(mtx_driver_ptr_); - pointcloud_ts = driver_ptr_->ParseCloudPacket(packet_msg->data); + pointcloud_ts = driver_ptr_->ParseCloudPacket(current_scan_msg_->packets.back().data); pointcloud = std::get<0>(pointcloud_ts); } @@ -253,12 +265,28 @@ void HesaiDecoderWrapper::ProcessCloudPacket( return; } + publish_queue_.try_push({std::move(current_scan_msg_), pointcloud, std::get<1>(pointcloud_ts)}); + current_scan_msg_ = std::make_unique(); +} + +void HesaiDecoderWrapper::publish(PublishData && data) +{ + auto pointcloud = data.cloud; + auto header_stamp = rclcpp::Time(SecondsToChronoNanoSeconds(data.cloud_timestamp_s).count()); + cloud_watchdog_->update(); - // Publish scan message only if it has been written to - if (current_scan_msg_ && !current_scan_msg_->packets.empty()) { - packets_pub_->publish(std::move(current_scan_msg_)); - current_scan_msg_ = std::make_unique(); + if (!data.packets->packets.empty()) { + auto pandar_scan = std::make_unique(); + pandar_scan->header = data.packets->header; + + for (const auto & pkt : data.packets->packets) { + auto & pandar_pkt = pandar_scan->packets.emplace_back(); + pandar_pkt.stamp = pkt.stamp; + std::copy(pkt.data.begin(), pkt.data.end(), pandar_pkt.data.begin()); + } + + packets_pub_->publish(std::move(pandar_scan)); } if ( @@ -266,8 +294,7 @@ void HesaiDecoderWrapper::ProcessCloudPacket( nebula_points_pub_->get_intra_process_subscription_count() > 0) { auto ros_pc_msg_ptr = std::make_unique(); pcl::toROSMsg(*pointcloud, *ros_pc_msg_ptr); - ros_pc_msg_ptr->header.stamp = - rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count()); + ros_pc_msg_ptr->header.stamp = header_stamp; PublishCloud(std::move(ros_pc_msg_ptr), nebula_points_pub_); } if ( @@ -277,19 +304,17 @@ void HesaiDecoderWrapper::ProcessCloudPacket( nebula::drivers::convertPointXYZIRCAEDTToPointXYZIR(pointcloud); auto ros_pc_msg_ptr = std::make_unique(); pcl::toROSMsg(*autoware_cloud_xyzi, *ros_pc_msg_ptr); - ros_pc_msg_ptr->header.stamp = - rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count()); + ros_pc_msg_ptr->header.stamp = header_stamp; PublishCloud(std::move(ros_pc_msg_ptr), aw_points_base_pub_); } if ( aw_points_ex_pub_->get_subscription_count() > 0 || aw_points_ex_pub_->get_intra_process_subscription_count() > 0) { - const auto autoware_ex_cloud = nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT( - pointcloud, std::get<1>(pointcloud_ts)); + const auto autoware_ex_cloud = + nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT(pointcloud, data.cloud_timestamp_s); auto ros_pc_msg_ptr = std::make_unique(); pcl::toROSMsg(*autoware_ex_cloud, *ros_pc_msg_ptr); - ros_pc_msg_ptr->header.stamp = - rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count()); + ros_pc_msg_ptr->header.stamp = header_stamp; PublishCloud(std::move(ros_pc_msg_ptr), aw_points_ex_pub_); } } diff --git a/nebula_ros/src/hesai/hesai_ros_wrapper.cpp b/nebula_ros/src/hesai/hesai_ros_wrapper.cpp index 132d0f4d2..ce6b6e544 100644 --- a/nebula_ros/src/hesai/hesai_ros_wrapper.cpp +++ b/nebula_ros/src/hesai/hesai_ros_wrapper.cpp @@ -2,6 +2,8 @@ #include "nebula_ros/hesai/hesai_ros_wrapper.hpp" +#include + #pragma clang diagnostic ignored "-Wbitwise-instead-of-logical" namespace nebula @@ -11,11 +13,7 @@ namespace ros HesaiRosWrapper::HesaiRosWrapper(const rclcpp::NodeOptions & options) : rclcpp::Node("hesai_ros_wrapper", rclcpp::NodeOptions(options).use_intra_process_comms(true)), wrapper_status_(Status::NOT_INITIALIZED), - sensor_cfg_ptr_(nullptr), - packet_queue_(3000), - hw_interface_wrapper_(), - hw_monitor_wrapper_(), - decoder_wrapper_() + sensor_cfg_ptr_(nullptr) { setvbuf(stdout, NULL, _IONBF, BUFSIZ); @@ -40,12 +38,6 @@ HesaiRosWrapper::HesaiRosWrapper(const rclcpp::NodeOptions & options) RCLCPP_DEBUG(get_logger(), "Starting stream"); - decoder_thread_ = std::thread([this]() { - while (true) { - decoder_wrapper_->ProcessCloudPacket(packet_queue_.pop()); - } - }); - if (launch_hw_) { hw_interface_wrapper_->HwInterface()->RegisterScanCallback( std::bind(&HesaiRosWrapper::ReceiveCloudPacketCallback, this, std::placeholders::_1)); @@ -212,7 +204,7 @@ void HesaiRosWrapper::ReceiveScanMessageCallback( nebula_pkt_ptr->stamp = pkt.stamp; std::copy(pkt.data.begin(), pkt.data.end(), std::back_inserter(nebula_pkt_ptr->data)); - packet_queue_.push(std::move(nebula_pkt_ptr)); + decoder_wrapper_->ProcessCloudPacket(std::move(nebula_pkt_ptr)); } } @@ -304,9 +296,7 @@ void HesaiRosWrapper::ReceiveCloudPacketCallback(std::vector & packet) msg_ptr->stamp.nanosec = static_cast(timestamp_ns % 1'000'000'000); msg_ptr->data.swap(packet); - if (!packet_queue_.try_push(std::move(msg_ptr))) { - RCLCPP_ERROR_THROTTLE(get_logger(), *get_clock(), 500, "Packet(s) dropped"); - } + decoder_wrapper_->ProcessCloudPacket(std::move(msg_ptr)); } RCLCPP_COMPONENTS_REGISTER_NODE(HesaiRosWrapper)