Skip to content

Commit

Permalink
use processing threads
Browse files Browse the repository at this point in the history
  • Loading branch information
christianrauch committed May 26, 2024
1 parent 0accf50 commit c259040
Showing 1 changed file with 110 additions and 85 deletions.
195 changes: 110 additions & 85 deletions src/CameraNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ class CameraNode : public rclcpp::Node
libcamera::Stream *stream;
std::shared_ptr<libcamera::FrameBufferAllocator> allocator;
std::vector<std::unique_ptr<libcamera::Request>> requests;
std::mutex request_lock;
std::vector<std::thread> request_threads;
std::unordered_map<libcamera::Request *, std::unique_ptr<std::mutex>> request_locks;
std::atomic<bool> running;

struct buffer_info_t
{
Expand Down Expand Up @@ -120,6 +122,9 @@ class CameraNode : public rclcpp::Node
void
requestComplete(libcamera::Request *request);

void
process(libcamera::Request *request);

rcl_interfaces::msg::SetParametersResult
onParameterChange(const std::vector<rclcpp::Parameter> &parameters);
};
Expand Down Expand Up @@ -376,6 +381,14 @@ CameraNode::CameraNode(const rclcpp::NodeOptions &options) : Node("camera", opti
requests.push_back(std::move(request));
}

// create a processing thread per request
for (std::unique_ptr<libcamera::Request> &request : requests) {
request_locks[request.get()] = std::make_unique<std::mutex>();
request_locks[request.get()]->try_lock();
running = true;
request_threads.emplace_back(&CameraNode::process, this, request.get());
}

// register callback
camera->requestCompleted.connect(this, &CameraNode::requestComplete);

Expand All @@ -393,11 +406,14 @@ CameraNode::CameraNode(const rclcpp::NodeOptions &options) : Node("camera", opti

CameraNode::~CameraNode()
{
camera->requestCompleted.disconnect();
request_lock.lock();
// stop request processing threads
running = false;
for (std::thread &thread : request_threads)
thread.join();

// stop camera
if (camera->stop())
std::cerr << "failed to stop camera" << std::endl;
request_lock.unlock();
camera->release();
camera_manager.stop();
for (const auto &e : buffer_info)
Expand Down Expand Up @@ -535,93 +551,102 @@ compressImageMsg(const sensor_msgs::msg::Image &source,
void
CameraNode::requestComplete(libcamera::Request *request)
{
request_lock.lock();

if (request->status() == libcamera::Request::RequestComplete) {
assert(request->buffers().size() == 1);

// get the stream and buffer from the request
const libcamera::FrameBuffer *buffer = request->findBuffer(stream);
const libcamera::FrameMetadata &metadata = buffer->metadata();
size_t bytesused = 0;
for (const libcamera::FrameMetadata::Plane &plane : metadata.planes())
bytesused += plane.bytesused;

// set time offset once for accurate timing using the device time
if (time_offset == 0)
time_offset = this->now().nanoseconds() - metadata.timestamp;

// send image data
std_msgs::msg::Header hdr;
hdr.stamp = rclcpp::Time(time_offset + int64_t(metadata.timestamp));
hdr.frame_id = "camera";
const libcamera::StreamConfiguration &cfg = stream->configuration();

auto msg_img = std::make_unique<sensor_msgs::msg::Image>();
auto msg_img_compressed = std::make_unique<sensor_msgs::msg::CompressedImage>();

if (format_type(cfg.pixelFormat) == FormatType::RAW) {
// raw uncompressed image
assert(buffer_info[buffer].size == bytesused);
msg_img->header = hdr;
msg_img->width = cfg.size.width;
msg_img->height = cfg.size.height;
msg_img->step = cfg.stride;
msg_img->encoding = get_ros_encoding(cfg.pixelFormat);
msg_img->is_bigendian = (__BYTE_ORDER__ == __ORDER_BIG_ENDIAN__);
msg_img->data.resize(buffer_info[buffer].size);
memcpy(msg_img->data.data(), buffer_info[buffer].data, buffer_info[buffer].size);

// compress to jpeg
if (pub_image_compressed->get_subscription_count())
try {
compressImageMsg(*msg_img, *msg_img_compressed, {cv::IMWRITE_JPEG_QUALITY, jpeg_quality});
}
catch (const cv_bridge::Exception &e) {
RCLCPP_ERROR_STREAM(get_logger(), e.what());
}
}
else if (format_type(cfg.pixelFormat) == FormatType::COMPRESSED) {
// compressed image
assert(bytesused < buffer_info[buffer].size);
msg_img_compressed->header = hdr;
msg_img_compressed->format = get_ros_encoding(cfg.pixelFormat);
msg_img_compressed->data.resize(bytesused);
memcpy(msg_img_compressed->data.data(), buffer_info[buffer].data, bytesused);

// decompress into raw rgb8 image
if (pub_image->get_subscription_count())
cv_bridge::toCvCopy(*msg_img_compressed, "rgb8")->toImageMsg(*msg_img);
}
else {
throw std::runtime_error("unsupported pixel format: " +
stream->configuration().pixelFormat.toString());
}
request_locks[request]->unlock();
}

pub_image->publish(std::move(msg_img));
pub_image_compressed->publish(std::move(msg_img_compressed));
void
CameraNode::process(libcamera::Request *request)
{
while (running) {
// block until request is available
request_locks[request]->lock();

if (request->status() == libcamera::Request::RequestComplete) {
assert(request->buffers().size() == 1);

// get the stream and buffer from the request
const libcamera::FrameBuffer *buffer = request->findBuffer(stream);
const libcamera::FrameMetadata &metadata = buffer->metadata();
size_t bytesused = 0;
for (const libcamera::FrameMetadata::Plane &plane : metadata.planes())
bytesused += plane.bytesused;

// set time offset once for accurate timing using the device time
if (time_offset == 0)
time_offset = this->now().nanoseconds() - metadata.timestamp;

// send image data
std_msgs::msg::Header hdr;
hdr.stamp = rclcpp::Time(time_offset + int64_t(metadata.timestamp));
hdr.frame_id = "camera";
const libcamera::StreamConfiguration &cfg = stream->configuration();

auto msg_img = std::make_unique<sensor_msgs::msg::Image>();
auto msg_img_compressed = std::make_unique<sensor_msgs::msg::CompressedImage>();

if (format_type(cfg.pixelFormat) == FormatType::RAW) {
// raw uncompressed image
assert(buffer_info[buffer].size == bytesused);
msg_img->header = hdr;
msg_img->width = cfg.size.width;
msg_img->height = cfg.size.height;
msg_img->step = cfg.stride;
msg_img->encoding = get_ros_encoding(cfg.pixelFormat);
msg_img->is_bigendian = (__BYTE_ORDER__ == __ORDER_BIG_ENDIAN__);
msg_img->data.resize(buffer_info[buffer].size);
memcpy(msg_img->data.data(), buffer_info[buffer].data, buffer_info[buffer].size);

// compress to jpeg
if (pub_image_compressed->get_subscription_count()) {
try {
compressImageMsg(*msg_img, *msg_img_compressed,
{cv::IMWRITE_JPEG_QUALITY, jpeg_quality});
}
catch (const cv_bridge::Exception &e) {
RCLCPP_ERROR_STREAM(get_logger(), e.what());
}
}
}
else if (format_type(cfg.pixelFormat) == FormatType::COMPRESSED) {
// compressed image
assert(bytesused < buffer_info[buffer].size);
msg_img_compressed->header = hdr;
msg_img_compressed->format = get_ros_encoding(cfg.pixelFormat);
msg_img_compressed->data.resize(bytesused);
memcpy(msg_img_compressed->data.data(), buffer_info[buffer].data, bytesused);

// decompress into raw rgb8 image
if (pub_image->get_subscription_count())
cv_bridge::toCvCopy(*msg_img_compressed, "rgb8")->toImageMsg(*msg_img);
}
else {
throw std::runtime_error("unsupported pixel format: " +
stream->configuration().pixelFormat.toString());
}

sensor_msgs::msg::CameraInfo ci = cim.getCameraInfo();
ci.header = hdr;
pub_ci->publish(ci);
}
else if (request->status() == libcamera::Request::RequestCancelled) {
RCLCPP_ERROR_STREAM(get_logger(), "request '" << request->toString() << "' cancelled");
}
pub_image->publish(std::move(msg_img));
pub_image_compressed->publish(std::move(msg_img_compressed));

// queue the request again for the next frame
request->reuse(libcamera::Request::ReuseBuffers);
sensor_msgs::msg::CameraInfo ci = cim.getCameraInfo();
ci.header = hdr;
pub_ci->publish(ci);
}
else if (request->status() == libcamera::Request::RequestCancelled) {
RCLCPP_ERROR_STREAM(get_logger(), "request '" << request->toString() << "' cancelled");
}

// update parameters
parameters_lock.lock();
for (const auto &[id, value] : parameters)
request->controls().set(id, value);
parameters.clear();
parameters_lock.unlock();
// queue the request again for the next frame
request->reuse(libcamera::Request::ReuseBuffers);

camera->queueRequest(request);
// update parameters
parameters_lock.lock();
for (const auto &[id, value] : parameters)
request->controls().set(id, value);
parameters.clear();
parameters_lock.unlock();

request_lock.unlock();
camera->queueRequest(request);
}
}

rcl_interfaces::msg::SetParametersResult
Expand Down

0 comments on commit c259040

Please sign in to comment.