Skip to content

Commit

Permalink
perf(velodyne): move mt_queue from before decoder to after it, improv…
Browse files Browse the repository at this point in the history
…ing performance
  • Loading branch information
mojomex committed Jul 23, 2024
1 parent 3de7995 commit f77ee3b
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 51 deletions.
30 changes: 22 additions & 8 deletions nebula_ros/include/nebula_ros/velodyne/decoder_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include "nebula_ros/common/mt_queue.hpp"
#include "nebula_ros/common/parameter_descriptors.hpp"
#include "nebula_ros/common/watchdog_timer.hpp"

Expand All @@ -25,6 +26,7 @@
#include <rclcpp/rclcpp.hpp>

#include <nebula_msgs/msg/nebula_packet.hpp>
#include <nebula_msgs/msg/nebula_packets.hpp>
#include <velodyne_msgs/msg/velodyne_packet.hpp>
#include <velodyne_msgs/msg/velodyne_scan.hpp>

Expand Down Expand Up @@ -64,11 +66,20 @@ class VelodyneDecoderWrapper
nebula::Status Status();

private:
struct PublishData
{
nebula_msgs::msg::NebulaPackets::UniquePtr packets;
drivers::NebulaPointCloudPtr cloud;
double cloud_timestamp_s;
};

/// @brief Load calibration data from file
/// @param calibration_file_path The file to read from
/// @return The calibration data if successful, or an error code if not
get_calibration_result_t GetCalibrationData(const std::string & calibration_file_path);

void publish(PublishData && data);

void PublishCloud(
std::unique_ptr<sensor_msgs::msg::PointCloud2> pointcloud,
const rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr & publisher);
Expand All @@ -88,18 +99,21 @@ class VelodyneDecoderWrapper
const std::shared_ptr<nebula::drivers::VelodyneHwInterface> hw_interface_;
std::shared_ptr<const nebula::drivers::VelodyneSensorConfiguration> sensor_cfg_;

std::string calibration_file_path_{};
std::shared_ptr<const drivers::VelodyneCalibrationConfiguration> calibration_cfg_ptr_{};
std::string calibration_file_path_;
std::shared_ptr<const drivers::VelodyneCalibrationConfiguration> calibration_cfg_ptr_;

std::shared_ptr<drivers::VelodyneDriver> driver_ptr_{};
std::shared_ptr<drivers::VelodyneDriver> driver_ptr_;
std::mutex mtx_driver_ptr_;

rclcpp::Publisher<velodyne_msgs::msg::VelodyneScan>::SharedPtr packets_pub_{};
velodyne_msgs::msg::VelodyneScan::UniquePtr current_scan_msg_{};
mt_queue<PublishData> publish_queue_;
std::thread pub_thread_;

rclcpp::Publisher<velodyne_msgs::msg::VelodyneScan>::SharedPtr packets_pub_;
nebula_msgs::msg::NebulaPackets::UniquePtr current_scan_msg_;

rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr nebula_points_pub_{};
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr aw_points_ex_pub_{};
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr aw_points_base_pub_{};
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr nebula_points_pub_;
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr aw_points_ex_pub_;
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr aw_points_base_pub_;

std::shared_ptr<WatchdogTimer> cloud_watchdog_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include "nebula_ros/common/mt_queue.hpp"
#include "nebula_ros/common/parameter_descriptors.hpp"
#include "nebula_ros/velodyne/decoder_wrapper.hpp"
#include "nebula_ros/velodyne/hw_interface_wrapper.hpp"
Expand Down Expand Up @@ -84,11 +83,6 @@ class VelodyneRosWrapper final : public rclcpp::Node

std::shared_ptr<const nebula::drivers::VelodyneSensorConfiguration> sensor_cfg_ptr_{};

/// @brief Stores received packets that have not been processed yet by the decoder thread
mt_queue<std::unique_ptr<nebula_msgs::msg::NebulaPacket>> packet_queue_;
/// @brief Thread to isolate decoding from receiving
std::thread decoder_thread_;

rclcpp::Subscription<velodyne_msgs::msg::VelodyneScan>::SharedPtr packets_sub_{};

bool launch_hw_;
Expand Down
69 changes: 47 additions & 22 deletions nebula_ros/src/velodyne/decoder_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

#include "nebula_ros/velodyne/decoder_wrapper.hpp"

#include "nebula_msgs/msg/detail/nebula_packets__struct.hpp"
#include <velodyne_msgs/msg/detail/velodyne_scan__struct.hpp>

#include <memory>

namespace nebula
{
namespace ros
Expand All @@ -16,7 +21,9 @@ VelodyneDecoderWrapper::VelodyneDecoderWrapper(
: status_(nebula::Status::NOT_INITIALIZED),
logger_(parent_node->get_logger().get_child("VelodyneDecoder")),
hw_interface_(hw_interface),
sensor_cfg_(config)
sensor_cfg_(config),
publish_queue_(1),
current_scan_msg_(std::make_unique<nebula_msgs::msg::NebulaPackets>())
{
if (!config) {
throw std::runtime_error(
Expand Down Expand Up @@ -48,14 +55,12 @@ VelodyneDecoderWrapper::VelodyneDecoderWrapper(

// Publish packets only if HW interface is connected
if (hw_interface_) {
current_scan_msg_ = std::make_unique<velodyne_msgs::msg::VelodyneScan>();
packets_pub_ = parent_node->create_publisher<velodyne_msgs::msg::VelodyneScan>(
"velodyne_packets", rclcpp::SensorDataQoS());
}

auto qos_profile = rmw_qos_profile_sensor_data;
auto pointcloud_qos =
rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 10), qos_profile);
auto pointcloud_qos = rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 1), qos_profile);

nebula_points_pub_ =
parent_node->create_publisher<sensor_msgs::msg::PointCloud2>("velodyne_points", pointcloud_qos);
Expand All @@ -72,6 +77,13 @@ VelodyneDecoderWrapper::VelodyneDecoderWrapper(
RCLCPP_WARN_THROTTLE(
logger_, *parent_node->get_clock(), 5000, "Missed pointcloud output deadline");
});

pub_thread_ = std::thread([&]() {
while (true) {
auto publish_data = publish_queue_.pop();
publish(std::move(publish_data));
}
});
}

void VelodyneDecoderWrapper::OnConfigChange(
Expand Down Expand Up @@ -158,47 +170,62 @@ VelodyneDecoderWrapper::get_calibration_result_t VelodyneDecoderWrapper::GetCali
void VelodyneDecoderWrapper::ProcessCloudPacket(
std::unique_ptr<nebula_msgs::msg::NebulaPacket> packet_msg)
{
auto & packet = *packet_msg;

// Accumulate packets for recording only if someone is subscribed to the topic (for performance)
if (
hw_interface_ && (packets_pub_->get_subscription_count() > 0 ||
packets_pub_->get_intra_process_subscription_count() > 0)) {
if (hw_interface_) {
if (current_scan_msg_->packets.size() == 0) {
current_scan_msg_->header.stamp = packet_msg->stamp;
current_scan_msg_->header.frame_id = sensor_cfg_->frame_id;
}

velodyne_msgs::msg::VelodynePacket velodyne_packet_msg{};
velodyne_packet_msg.stamp = packet_msg->stamp;
std::copy(packet_msg->data.begin(), packet_msg->data.end(), velodyne_packet_msg.data.begin());
current_scan_msg_->packets.emplace_back(std::move(velodyne_packet_msg));
current_scan_msg_->packets.emplace_back(std::move(*packet_msg));
packet = current_scan_msg_->packets.back();
}

std::tuple<nebula::drivers::NebulaPointCloudPtr, double> pointcloud_ts{};
nebula::drivers::NebulaPointCloudPtr pointcloud = nullptr;
{
std::lock_guard lock(mtx_driver_ptr_);
pointcloud_ts = driver_ptr_->ParseCloudPacket(packet_msg->data, packet_msg->stamp.sec);
pointcloud_ts = driver_ptr_->ParseCloudPacket(packet.data, packet.stamp.sec);
pointcloud = std::get<0>(pointcloud_ts);
}

if (pointcloud == nullptr) {
return;
}

publish_queue_.try_push({std::move(current_scan_msg_), pointcloud, std::get<1>(pointcloud_ts)});
current_scan_msg_ = std::make_unique<nebula_msgs::msg::NebulaPackets>();
}

void VelodyneDecoderWrapper::publish(PublishData && data)
{
auto pointcloud = data.cloud;
auto header_stamp = rclcpp::Time(SecondsToChronoNanoSeconds(data.cloud_timestamp_s).count());

cloud_watchdog_->update();

// Publish scan message only if it has been written to
if (current_scan_msg_ && !current_scan_msg_->packets.empty()) {
packets_pub_->publish(std::move(current_scan_msg_));
current_scan_msg_ = std::make_unique<velodyne_msgs::msg::VelodyneScan>();
auto velodyne_scan = std::make_unique<velodyne_msgs::msg::VelodyneScan>();
velodyne_scan->header = data.packets->header;

for (const auto & pkt : data.packets->packets) {
auto & velodyne_pkt = velodyne_scan->packets.emplace_back();
velodyne_pkt.stamp = pkt.stamp;
std::copy(pkt.data.begin(), pkt.data.end(), velodyne_pkt.data.begin());
}

packets_pub_->publish(std::move(velodyne_scan));
}

if (
nebula_points_pub_->get_subscription_count() > 0 ||
nebula_points_pub_->get_intra_process_subscription_count() > 0) {
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*pointcloud, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
ros_pc_msg_ptr->header.stamp = header_stamp;
PublishCloud(std::move(ros_pc_msg_ptr), nebula_points_pub_);
}
if (
Expand All @@ -208,19 +235,17 @@ void VelodyneDecoderWrapper::ProcessCloudPacket(
nebula::drivers::convertPointXYZIRCAEDTToPointXYZIR(pointcloud);
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*autoware_cloud_xyzi, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
ros_pc_msg_ptr->header.stamp = header_stamp;
PublishCloud(std::move(ros_pc_msg_ptr), aw_points_base_pub_);
}
if (
aw_points_ex_pub_->get_subscription_count() > 0 ||
aw_points_ex_pub_->get_intra_process_subscription_count() > 0) {
const auto autoware_ex_cloud = nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT(
pointcloud, std::get<1>(pointcloud_ts));
const auto autoware_ex_cloud =
nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT(pointcloud, data.cloud_timestamp_s);
auto ros_pc_msg_ptr = std::make_unique<sensor_msgs::msg::PointCloud2>();
pcl::toROSMsg(*autoware_ex_cloud, *ros_pc_msg_ptr);
ros_pc_msg_ptr->header.stamp =
rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count());
ros_pc_msg_ptr->header.stamp = header_stamp;
PublishCloud(std::move(ros_pc_msg_ptr), aw_points_ex_pub_);
}
}
Expand Down
18 changes: 3 additions & 15 deletions nebula_ros/src/velodyne/velodyne_ros_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ namespace ros
VelodyneRosWrapper::VelodyneRosWrapper(const rclcpp::NodeOptions & options)
: rclcpp::Node("velodyne_ros_wrapper", rclcpp::NodeOptions(options).use_intra_process_comms(true)),
wrapper_status_(Status::NOT_INITIALIZED),
sensor_cfg_ptr_(nullptr),
packet_queue_(3000),
hw_interface_wrapper_(),
hw_monitor_wrapper_(),
decoder_wrapper_()
sensor_cfg_ptr_(nullptr)
{
setvbuf(stdout, NULL, _IONBF, BUFSIZ);

Expand All @@ -40,12 +36,6 @@ VelodyneRosWrapper::VelodyneRosWrapper(const rclcpp::NodeOptions & options)

RCLCPP_DEBUG(get_logger(), "Starting stream");

decoder_thread_ = std::thread([this]() {
while (true) {
decoder_wrapper_->ProcessCloudPacket(packet_queue_.pop());
}
});

if (launch_hw_) {
hw_interface_wrapper_->HwInterface()->RegisterScanCallback(
std::bind(&VelodyneRosWrapper::ReceiveCloudPacketCallback, this, std::placeholders::_1));
Expand Down Expand Up @@ -156,7 +146,7 @@ void VelodyneRosWrapper::ReceiveScanMessageCallback(
nebula_pkt_ptr->stamp = pkt.stamp;
std::copy(pkt.data.begin(), pkt.data.end(), std::back_inserter(nebula_pkt_ptr->data));

packet_queue_.push(std::move(nebula_pkt_ptr));
decoder_wrapper_->ProcessCloudPacket(std::move(nebula_pkt_ptr));
}
}

Expand Down Expand Up @@ -247,9 +237,7 @@ void VelodyneRosWrapper::ReceiveCloudPacketCallback(std::vector<uint8_t> & packe
msg_ptr->stamp.nanosec = static_cast<int>(timestamp_ns % 1'000'000'000);
msg_ptr->data.swap(packet);

if (!packet_queue_.try_push(std::move(msg_ptr))) {
RCLCPP_ERROR_THROTTLE(get_logger(), *get_clock(), 500, "Packet(s) dropped");
}
decoder_wrapper_->ProcessCloudPacket(std::move(msg_ptr));
}

RCLCPP_COMPONENTS_REGISTER_NODE(VelodyneRosWrapper)
Expand Down

0 comments on commit f77ee3b

Please sign in to comment.