From 95f2468f8608c4be2d4d9aeb3ba97b4d6841251c Mon Sep 17 00:00:00 2001 From: Trim Bresilla Date: Sat, 24 Jan 2026 23:56:18 +0100 Subject: [PATCH 1/2] feat: re-export rcl bindings --- rclrs/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclrs/src/lib.rs b/rclrs/src/lib.rs index 9db4767f..6ae8df65 100644 --- a/rclrs/src/lib.rs +++ b/rclrs/src/lib.rs @@ -204,7 +204,7 @@ mod worker; #[cfg(test)] mod test_helpers; -mod rcl_bindings; +pub mod rcl_bindings; pub use action::*; pub use arguments::*; From 5e09ba65255fa59bda3feb227237f5d299f5b3ab Mon Sep 17 00:00:00 2001 From: Trim Bresilla Date: Sun, 25 Jan 2026 01:04:05 +0100 Subject: [PATCH 2/2] feat: add serialized pub/sub APIs --- rclrs/src/dynamic_message.rs | 4 ++ rclrs/src/lib.rs | 6 +++ rclrs/src/node.rs | 80 +++++++++++++++++++++++++++- rclrs/src/serialized_message.rs | 40 ++++++++++++++ rclrs/src/serialized_publisher.rs | 34 ++++++++++++ rclrs/src/serialized_subscription.rs | 42 +++++++++++++++ 6 files changed, 205 insertions(+), 1 deletion(-) create mode 100644 rclrs/src/serialized_message.rs create mode 100644 rclrs/src/serialized_publisher.rs create mode 100644 rclrs/src/serialized_subscription.rs diff --git a/rclrs/src/dynamic_message.rs b/rclrs/src/dynamic_message.rs index aae749d0..7197d799 100644 --- a/rclrs/src/dynamic_message.rs +++ b/rclrs/src/dynamic_message.rs @@ -324,6 +324,10 @@ impl DynamicMessageMetadata { pub fn structure(&self) -> &MessageStructure { &self.structure } + + pub(crate) fn type_support_ptr(&self) -> *const rosidl_message_type_support_t { + self.type_support_ptr + } } // ========================= impl for DynamicMessage ========================= diff --git a/rclrs/src/lib.rs b/rclrs/src/lib.rs index 6ae8df65..95d49796 100644 --- a/rclrs/src/lib.rs +++ b/rclrs/src/lib.rs @@ -193,6 +193,9 @@ mod parameter; mod publisher; mod qos; mod service; +mod serialized_message; +mod serialized_publisher; +mod serialized_subscription; mod subscription; mod time; mod time_source; @@ -220,6 +223,9 @@ pub use node::*; pub use parameter::*; pub use publisher::*; pub use qos::*; +pub use serialized_message::*; +pub use serialized_publisher::*; +pub use serialized_subscription::*; pub use rcl_bindings::rmw_request_id_t; pub use service::*; pub use subscription::*; diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index 0583fb9d..1b7c607e 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -44,10 +44,12 @@ use crate::{ IntoNodeTimerOneshotCallback, IntoNodeTimerRepeatingCallback, IntoTimerOptions, LogParams, Logger, MessageInfo, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Promise, Publisher, PublisherOptions, PublisherState, RclrsError, RequestedGoal, Service, - ServiceOptions, ServiceState, Subscription, SubscriptionOptions, SubscriptionState, + ServiceOptions, ServiceState, SerializedPublisher, SerializedSubscription, Subscription, + SubscriptionOptions, SubscriptionState, TerminatedGoal, TimeSource, Timer, TimerState, ToLogParams, Worker, WorkerOptions, WorkerState, ENTITY_LIFECYCLE_MUTEX, }; +use crate::ToResult; /// A processing unit that can communicate with other nodes. See the API of /// [`NodeState`] to find out what methods you can call on a [`Node`]. @@ -1493,6 +1495,82 @@ impl NodeState { pub(crate) fn handle(&self) -> &Arc { &self.handle } + + /// Creates a serialized subscription. + /// + /// This receives raw serialized (CDR) bytes, using `rcl_take_serialized_message`. + pub fn create_serialized_subscription<'a>( + &self, + topic_type: MessageTypeName, + options: impl Into>, + ) -> Result { + let SubscriptionOptions { topic, qos } = options.into(); + + // Use the same typesupport resolution as dynamic messages. + let metadata = crate::dynamic_message::DynamicMessageMetadata::new(topic_type)?; + + let mut sub = unsafe { rcl_get_zero_initialized_subscription() }; + let topic_c = std::ffi::CString::new(topic).unwrap(); + + let _context_lock = self.handle.context_handle.rcl_context.lock().unwrap(); + let node = self.handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + + unsafe { + let mut opts = rcl_subscription_get_default_options(); + opts.qos = qos.into(); + rcl_subscription_init( + &mut sub, + &*node, + metadata.type_support_ptr(), + topic_c.as_ptr(), + &opts, + ) + .ok()?; + } + + Ok(SerializedSubscription { + handle: Arc::clone(&self.handle), + sub, + }) + } + + /// Creates a serialized publisher. + /// + /// This publishes raw serialized (CDR) bytes, using `rcl_publish_serialized_message`. + pub fn create_serialized_publisher<'a>( + &self, + topic_type: MessageTypeName, + options: impl Into>, + ) -> Result { + let crate::PublisherOptions { topic, qos } = options.into(); + + let metadata = crate::dynamic_message::DynamicMessageMetadata::new(topic_type)?; + let mut pub_ = unsafe { rcl_get_zero_initialized_publisher() }; + let topic_c = std::ffi::CString::new(topic).unwrap(); + + let _context_lock = self.handle.context_handle.rcl_context.lock().unwrap(); + let node = self.handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + + unsafe { + let mut opts = rcl_publisher_get_default_options(); + opts.qos = qos.into(); + rcl_publisher_init( + &mut pub_, + &*node, + metadata.type_support_ptr(), + topic_c.as_ptr(), + &opts, + ) + .ok()?; + } + + Ok(SerializedPublisher { + handle: Arc::clone(&self.handle), + pub_, + }) + } } impl<'a> ToLogParams<'a> for &'a NodeState { diff --git a/rclrs/src/serialized_message.rs b/rclrs/src/serialized_message.rs new file mode 100644 index 00000000..67251676 --- /dev/null +++ b/rclrs/src/serialized_message.rs @@ -0,0 +1,40 @@ +use crate::{rcl_bindings::*, RclrsError, ToResult}; + +/// A growable serialized message buffer. +/// +/// This wraps `rcl_serialized_message_t` (aka `rmw_serialized_message_t`). +pub struct SerializedMessage { + pub(crate) msg: rcl_serialized_message_t, +} + +unsafe impl Send for SerializedMessage {} + +impl SerializedMessage { + /// Create a new serialized message buffer with the given capacity in bytes. + pub fn new(capacity: usize) -> Result { + unsafe { + let mut msg = rcutils_get_zero_initialized_uint8_array(); + let allocator = rcutils_get_default_allocator(); + rcutils_uint8_array_init(&mut msg, capacity, &allocator).ok()?; + Ok(Self { msg }) + } + } + + /// Return the current serialized payload. + pub fn as_bytes(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self.msg.buffer, self.msg.buffer_length) } + } + + /// Reset the length to 0 without changing capacity. + pub fn clear(&mut self) { + self.msg.buffer_length = 0; + } +} + +impl Drop for SerializedMessage { + fn drop(&mut self) { + unsafe { + let _ = rcutils_uint8_array_fini(&mut self.msg); + } + } +} diff --git a/rclrs/src/serialized_publisher.rs b/rclrs/src/serialized_publisher.rs new file mode 100644 index 00000000..ee235954 --- /dev/null +++ b/rclrs/src/serialized_publisher.rs @@ -0,0 +1,34 @@ +use crate::{node::NodeHandle, rcl_bindings::*, RclrsError, ToResult, ENTITY_LIFECYCLE_MUTEX}; +use std::{ptr, sync::Arc}; + +use crate::serialized_message::SerializedMessage; + +/// A publisher which publishes serialized ROS messages. +pub struct SerializedPublisher { + pub(crate) handle: Arc, + pub(crate) pub_: rcl_publisher_t, +} + +unsafe impl Send for SerializedPublisher {} +unsafe impl Sync for SerializedPublisher {} + +impl Drop for SerializedPublisher { + fn drop(&mut self) { + let _context_lock = self.handle.context_handle.rcl_context.lock().unwrap(); + let mut node = self.handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + unsafe { + let _ = rcl_publisher_fini(&mut self.pub_, &mut *node); + } + } +} + +impl SerializedPublisher { + /// Publish a serialized (CDR) message. + pub fn publish(&self, msg: &SerializedMessage) -> Result<(), RclrsError> { + unsafe { + rcl_publish_serialized_message(&self.pub_, &msg.msg, ptr::null_mut()).ok()?; + } + Ok(()) + } +} diff --git a/rclrs/src/serialized_subscription.rs b/rclrs/src/serialized_subscription.rs new file mode 100644 index 00000000..f4e574f3 --- /dev/null +++ b/rclrs/src/serialized_subscription.rs @@ -0,0 +1,42 @@ +use crate::{node::NodeHandle, rcl_bindings::*, MessageInfo, RclrsError, ENTITY_LIFECYCLE_MUTEX}; +use std::{ptr, sync::Arc}; + +use crate::serialized_message::SerializedMessage; + +/// A subscription which receives serialized ROS messages. +pub struct SerializedSubscription { + pub(crate) handle: Arc, + pub(crate) sub: rcl_subscription_t, +} + +unsafe impl Send for SerializedSubscription {} +unsafe impl Sync for SerializedSubscription {} + +impl Drop for SerializedSubscription { + fn drop(&mut self) { + let _context_lock = self.handle.context_handle.rcl_context.lock().unwrap(); + let mut node = self.handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + unsafe { + let _ = rcl_subscription_fini(&mut self.sub, &mut *node); + } + } +} + +impl SerializedSubscription { + /// Take a serialized (CDR) message. + /// + /// Returns `Ok(None)` when no message is available. + pub fn take(&self, buf: &mut SerializedMessage) -> Result, RclrsError> { + unsafe { + let mut info: rmw_message_info_t = std::mem::zeroed(); + let rc = + rcl_take_serialized_message(&self.sub, &mut buf.msg, &mut info, ptr::null_mut()); + if rc != 0 { + // No message available or error. The rmw/rcl API uses negative codes for "take failed". + return Ok(None); + } + Ok(Some(MessageInfo::from_rmw_message_info(&info))) + } + } +}