Skip to content

Commit

Permalink
perf: move mt_queue from before decoder to after it, improving perfor…
Browse files Browse the repository at this point in the history
…mance
  • Loading branch information
mojomex committed Jul 22, 2024
1 parent c6b6065 commit 70de7e8
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 54 deletions.
35 changes: 27 additions & 8 deletions nebula_ros/include/nebula_ros/hesai/decoder_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,28 @@

#include "nebula_decoders/nebula_decoders_hesai/hesai_driver.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp"
#include "nebula_ros/common/mt_queue.hpp"
#include "nebula_ros/common/parameter_descriptors.hpp"
#include "nebula_ros/common/watchdog_timer.hpp"

#include <nebula_common/hesai/hesai_common.hpp>
#include <nebula_common/nebula_common.hpp>
#include <nebula_common/point_types.hpp>
#include <rclcpp/rclcpp.hpp>

#include "nebula_msgs/msg/detail/nebula_packets__struct.hpp"
#include "nebula_msgs/msg/nebula_packet.hpp"
#include "nebula_msgs/msg/nebula_packets.hpp"
#include "pandar_msgs/msg/pandar_packet.hpp"
#include "pandar_msgs/msg/pandar_scan.hpp"

#include <filesystem>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <tuple>
#include <utility>
#include <vector>

namespace nebula
Expand Down Expand Up @@ -63,6 +70,13 @@ class HesaiDecoderWrapper
nebula::Status Status();

private:
struct PublishData
{
nebula_msgs::msg::NebulaPackets::UniquePtr packets;
drivers::NebulaPointCloudPtr cloud;
double cloud_timestamp_s;
};

/// @brief Load calibration data from the best available source:
/// 1. If sensor connected, download and save from sensor
/// 2. If downloaded file available, load that file
Expand All @@ -74,6 +88,8 @@ class HesaiDecoderWrapper
get_calibration_result_t GetCalibrationData(
const std::string & calibration_file_path, bool ignore_others = false);

void publish(PublishData && data);

void PublishCloud(
std::unique_ptr<sensor_msgs::msg::PointCloud2> pointcloud,
const rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr & publisher);
Expand All @@ -93,18 +109,21 @@ class HesaiDecoderWrapper
const std::shared_ptr<nebula::drivers::HesaiHwInterface> hw_interface_;
std::shared_ptr<const nebula::drivers::HesaiSensorConfiguration> sensor_cfg_;

std::string calibration_file_path_{};
std::shared_ptr<const drivers::HesaiCalibrationConfigurationBase> calibration_cfg_ptr_{};
std::string calibration_file_path_;
std::shared_ptr<const drivers::HesaiCalibrationConfigurationBase> calibration_cfg_ptr_;

std::shared_ptr<drivers::HesaiDriver> driver_ptr_{};
std::shared_ptr<drivers::HesaiDriver> driver_ptr_;
std::mutex mtx_driver_ptr_;

rclcpp::Publisher<pandar_msgs::msg::PandarScan>::SharedPtr packets_pub_{};
pandar_msgs::msg::PandarScan::UniquePtr current_scan_msg_{};
mt_queue<PublishData> publish_queue_;
std::thread pub_thread_;

rclcpp::Publisher<pandar_msgs::msg::PandarScan>::SharedPtr packets_pub_;
nebula_msgs::msg::NebulaPackets::UniquePtr current_scan_msg_;

rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr nebula_points_pub_{};
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr aw_points_ex_pub_{};
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr aw_points_base_pub_{};
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr nebula_points_pub_;
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr aw_points_ex_pub_;
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr aw_points_base_pub_;

std::shared_ptr<WatchdogTimer> cloud_watchdog_;
};
Expand Down
6 changes: 0 additions & 6 deletions nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "nebula_common/hesai/hesai_common.hpp"
#include "nebula_common/nebula_common.hpp"
#include "nebula_common/nebula_status.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp"
#include "nebula_ros/common/mt_queue.hpp"
#include "nebula_ros/common/parameter_descriptors.hpp"
#include "nebula_ros/hesai/decoder_wrapper.hpp"
Expand Down Expand Up @@ -83,11 +82,6 @@ class HesaiRosWrapper final : public rclcpp::Node

std::shared_ptr<const nebula::drivers::HesaiSensorConfiguration> sensor_cfg_ptr_{};

/// @brief Stores received packets that have not been processed yet by the decoder thread
mt_queue<std::unique_ptr<nebula_msgs::msg::NebulaPacket>> packet_queue_;
/// @brief Thread to isolate decoding from receiving
std::thread decoder_thread_;

rclcpp::Subscription<pandar_msgs::msg::PandarScan>::SharedPtr packets_sub_{};

bool launch_hw_;
Expand Down
75 changes: 50 additions & 25 deletions nebula_ros/src/hesai/decoder_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

#include "nebula_ros/hesai/decoder_wrapper.hpp"

#include <rclcpp/logging.hpp>

#include <nebula_msgs/msg/nebula_packets.hpp>
#include <pandar_msgs/msg/pandar_scan.hpp>

#include <algorithm>
#include <memory>
#include <thread>
#include <utility>

#pragma clang diagnostic ignored "-Wbitwise-instead-of-logical"
namespace nebula
{
Expand All @@ -17,7 +27,9 @@ HesaiDecoderWrapper::HesaiDecoderWrapper(
: status_(nebula::Status::NOT_INITIALIZED),
logger_(parent_node->get_logger().get_child("HesaiDecoder")),
hw_interface_(hw_interface),
sensor_cfg_(config)
sensor_cfg_(config),
publish_queue_(1),
current_scan_msg_(std::make_unique<nebula_msgs::msg::NebulaPackets>())
{
if (!config) {
throw std::runtime_error("HesaiDecoderWrapper cannot be instantiated without a valid config!");
Expand Down Expand Up @@ -54,14 +66,12 @@ HesaiDecoderWrapper::HesaiDecoderWrapper(

// Publish packets only if HW interface is connected
if (hw_interface_) {
current_scan_msg_ = std::make_unique<pandar_msgs::msg::PandarScan>();
packets_pub_ = parent_node->create_publisher<pandar_msgs::msg::PandarScan>(
"pandar_packets", rclcpp::SensorDataQoS());
}

auto qos_profile = rmw_qos_profile_sensor_data;
auto pointcloud_qos =
rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 10), qos_profile);
auto pointcloud_qos = rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 1), qos_profile);

nebula_points_pub_ =
parent_node->create_publisher<sensor_msgs::msg::PointCloud2>("pandar_points", pointcloud_qos);
Expand All @@ -78,6 +88,13 @@ HesaiDecoderWrapper::HesaiDecoderWrapper(
RCLCPP_WARN_THROTTLE(
logger_, *parent_node->get_clock(), 5000, "Missed pointcloud output deadline");
});

pub_thread_ = std::thread([&]() {
while (true) {
auto publish_data = publish_queue_.pop();
publish(std::move(publish_data));
}
});
}

void HesaiDecoderWrapper::OnConfigChange(
Expand Down Expand Up @@ -222,25 +239,20 @@ void HesaiDecoderWrapper::ProcessCloudPacket(
std::unique_ptr<nebula_msgs::msg::NebulaPacket> packet_msg)
{
// Accumulate packets for recording only if someone is subscribed to the topic (for performance)
if (
hw_interface_ && (packets_pub_->get_subscription_count() > 0 ||
packets_pub_->get_intra_process_subscription_count() > 0)) {
if (hw_interface_) {
if (current_scan_msg_->packets.size() == 0) {
current_scan_msg_->header.stamp = packet_msg->stamp;
current_scan_msg_->header.frame_id = sensor_cfg_->frame_id;
}

pandar_msgs::msg::PandarPacket pandar_packet_msg{};
pandar_packet_msg.stamp = packet_msg->stamp;
pandar_packet_msg.size = packet_msg->data.size();
std::copy(packet_msg->data.begin(), packet_msg->data.end(), pandar_packet_msg.data.begin());
current_scan_msg_->packets.emplace_back(std::move(pandar_packet_msg));
current_scan_msg_->packets.emplace_back(std::move(*packet_msg));
}

std::tuple<nebula::drivers::NebulaPointCloudPtr, double> pointcloud_ts{};
nebula::drivers::NebulaPointCloudPtr pointcloud = nullptr;
{
std::lock_guard lock(mtx_driver_ptr_);
pointcloud_ts = driver_ptr_->ParseCloudPacket(packet_msg->data);
pointcloud_ts = driver_ptr_->ParseCloudPacket(current_scan_msg_->packets.back().data);
pointcloud = std::get<0>(pointcloud_ts);
}

Expand All @@ -253,21 +265,36 @@ void HesaiDecoderWrapper::ProcessCloudPacket(
return;
}

publish_queue_.try_push({std::move(current_scan_msg_), pointcloud, std::get<1>(pointcloud_ts)});
current_scan_msg_ = std::make_unique<nebula_msgs::msg::NebulaPackets>();
}

void HesaiDecoderWrapper::publish(PublishData && data)
{
auto pointcloud = data.cloud;
auto header_stamp = rclcpp::Time(SecondsToChronoNanoSeconds(data.cloud_timestamp_s).count());

cloud_watchdog_->update();

// Publish scan message only if it has been written to
if (current_scan_msg_ && !current_scan_msg_->packets.empty()) {
packets_pub_->publish(std::move(current_scan_msg_));
current_scan_msg_ = std::make_unique<pandar_msgs::msg::PandarScan>();
if (!data.packets->packets.empty()) {
auto pandar_scan = std::make_unique<pandar_msgs::msg::PandarScan>();
pandar_scan->header = data.packets->header;

for (const auto & pkt : data.packets->packets) {
auto & pandar_pkt = pandar_scan->packets.emplace_back();
pandar_pkt.stamp = pkt.stamp;
std::copy(pkt.data.begin(), pkt.data.end(), pandar_pkt.data.begin());
}

packets_pub_->publish(std::move(pandar_scan));
}

if (
nebula_points_pub_->get_subscription_count() > 0 ||
nebula_points_pub_->get_intra_process_subscription_count() > 0) {
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*pointcloud, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
ros_pc_msg_ptr->header.stamp = header_stamp;
PublishCloud(std::move(ros_pc_msg_ptr), nebula_points_pub_);
}
if (
Expand All @@ -277,19 +304,17 @@ void HesaiDecoderWrapper::ProcessCloudPacket(
nebula::drivers::convertPointXYZIRCAEDTToPointXYZIR(pointcloud);
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*autoware_cloud_xyzi, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
ros_pc_msg_ptr->header.stamp = header_stamp;
PublishCloud(std::move(ros_pc_msg_ptr), aw_points_base_pub_);
}
if (
aw_points_ex_pub_->get_subscription_count() > 0 ||
aw_points_ex_pub_->get_intra_process_subscription_count() > 0) {
const auto autoware_ex_cloud = nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT(
pointcloud, std::get<1>(pointcloud_ts));
const auto autoware_ex_cloud =
nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT(pointcloud, data.cloud_timestamp_s);
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*autoware_ex_cloud, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
ros_pc_msg_ptr->header.stamp = header_stamp;
PublishCloud(std::move(ros_pc_msg_ptr), aw_points_ex_pub_);
}
}
Expand Down
20 changes: 5 additions & 15 deletions nebula_ros/src/hesai/hesai_ros_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "nebula_ros/hesai/hesai_ros_wrapper.hpp"

#include <utility>

#pragma clang diagnostic ignored "-Wbitwise-instead-of-logical"

namespace nebula
Expand All @@ -11,11 +13,7 @@ namespace ros
HesaiRosWrapper::HesaiRosWrapper(const rclcpp::NodeOptions & options)
: rclcpp::Node("hesai_ros_wrapper", rclcpp::NodeOptions(options).use_intra_process_comms(true)),
wrapper_status_(Status::NOT_INITIALIZED),
sensor_cfg_ptr_(nullptr),
packet_queue_(3000),
hw_interface_wrapper_(),
hw_monitor_wrapper_(),
decoder_wrapper_()
sensor_cfg_ptr_(nullptr)
{
setvbuf(stdout, NULL, _IONBF, BUFSIZ);

Expand All @@ -40,12 +38,6 @@ HesaiRosWrapper::HesaiRosWrapper(const rclcpp::NodeOptions & options)

RCLCPP_DEBUG(get_logger(), "Starting stream");

decoder_thread_ = std::thread([this]() {
while (true) {
decoder_wrapper_->ProcessCloudPacket(packet_queue_.pop());
}
});

if (launch_hw_) {
hw_interface_wrapper_->HwInterface()->RegisterScanCallback(
std::bind(&HesaiRosWrapper::ReceiveCloudPacketCallback, this, std::placeholders::_1));
Expand Down Expand Up @@ -212,7 +204,7 @@ void HesaiRosWrapper::ReceiveScanMessageCallback(
nebula_pkt_ptr->stamp = pkt.stamp;
std::copy(pkt.data.begin(), pkt.data.end(), std::back_inserter(nebula_pkt_ptr->data));

packet_queue_.push(std::move(nebula_pkt_ptr));
decoder_wrapper_->ProcessCloudPacket(std::move(nebula_pkt_ptr));
}
}

Expand Down Expand Up @@ -304,9 +296,7 @@ void HesaiRosWrapper::ReceiveCloudPacketCallback(std::vector<uint8_t> & packet)
msg_ptr->stamp.nanosec = static_cast<int>(timestamp_ns % 1'000'000'000);
msg_ptr->data.swap(packet);

if (!packet_queue_.try_push(std::move(msg_ptr))) {
RCLCPP_ERROR_THROTTLE(get_logger(), *get_clock(), 500, "Packet(s) dropped");
}
decoder_wrapper_->ProcessCloudPacket(std::move(msg_ptr));
}

RCLCPP_COMPONENTS_REGISTER_NODE(HesaiRosWrapper)
Expand Down

0 comments on commit 70de7e8

Please sign in to comment.