From 748793b7b3679c42793d88acbba2186b8540eeaf Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Mon, 9 Feb 2026 05:42:08 +0800 Subject: [PATCH 1/5] fix: ROS1 string decoding, /tf topic support, and schema variant handling This commit fixes several issues with ROS1 bag decoding and schema handling: 1. **ROS1 string decoding**: CDR decoder now correctly handles ROS1 strings where length is the exact byte count (no null terminator), unlike ROS2 CDR strings where length includes null terminator. 2. **MCAP writer schema encoding**: Fixed add_channel_with_schema to use correct schema encoding names ("ros2msg", "jsonschema", "protobuf") instead of message encoding, and pass message encoding to add_channel. 3. **Builtin type variant conflict**: populate_builtin_types now skips adding "/msg/" variant when the non-prefixed variant was already parsed from message_definition (e.g., std_msgs/Header from ROS1 with seq field). 4. **Test coverage**: Added test_decode_dynamic_tf_topic to verify /tf (tf2_msgs/TFMessage) decoding with nested Header and ROS1 strings. Fixes decoding of /tf messages from ROS1 bags containing std_msgs/Header with the seq field (ROS1 variant). --- src/encoding/cdr/decoder.rs | 16 ++++++ src/io/formats/mcap/writer.rs | 23 +++++++-- src/lib.rs | 6 +++ src/schema/parser/unified.rs | 32 +++++++++++- tests/ros1_decode_dynamic_tests.rs | 82 ++++++++++++++++++++++++++++++ 5 files changed, 155 insertions(+), 4 deletions(-) diff --git a/src/encoding/cdr/decoder.rs b/src/encoding/cdr/decoder.rs index d289cfc..cdced30 100644 --- a/src/encoding/cdr/decoder.rs +++ b/src/encoding/cdr/decoder.rs @@ -583,6 +583,9 @@ impl CdrDecoder { } /// Read a string value (matches TS `CdrReader.string()`). + /// + /// CDR/ROS2 strings: length includes null terminator, so we read `len-1` bytes + skip 1. + /// ROS1 strings: length is exact byte count (no null terminator), so we read all `len` bytes. fn read_string(&self, cursor: &mut CdrCursor) -> CoreResult { // Read length prefix (4 bytes) let len = cursor.read_u32()? as usize; @@ -593,6 +596,19 @@ impl CdrDecoder { ))); } + if cursor.is_ros1() { + // ROS1: length is exact byte count, no null terminator + if len == 0 { + return Ok(CodecValue::String(String::new())); + } + let string_bytes = cursor.read_bytes(len)?; + let s = std::str::from_utf8(string_bytes) + .map_err(|e| CodecError::parse("string utf8", format!("{e}")))? + .to_string(); + return Ok(CodecValue::String(s)); + } + + // CDR/ROS2: length includes null terminator if len <= 1 { // Empty string (length 0 or 1 for just null terminator) cursor.skip(len)?; diff --git a/src/io/formats/mcap/writer.rs b/src/io/formats/mcap/writer.rs index cb9cb43..e2d9d6c 100644 --- a/src/io/formats/mcap/writer.rs +++ b/src/io/formats/mcap/writer.rs @@ -1079,15 +1079,32 @@ impl FormatWriter for ParallelMcapWriter> { ) -> Result { // Add schema if provided let schema_id = if let Some(schema_data) = schema { - let schema_name = format!("{message_type}_schema"); - self.add_schema(&schema_name, encoding, schema_data.as_bytes())? + // Use message_type directly as schema name (not message_type_schema) + // so the parser can find the correct type definition + let schema_name = message_type; + // Determine schema encoding based on message type + // ROS message types use "ros2msg" schema encoding with CDR message encoding + // JSON message types use "jsonschema" schema encoding with JSON message encoding + let schema_encoding = if encoding == "cdr" + && (message_type.contains('/') || message_type.contains("msg")) + { + "ros2msg" + } else if encoding == "json" { + "jsonschema" + } else if encoding == "protobuf" { + "protobuf" + } else { + // Default to message encoding for backward compatibility + encoding + }; + self.add_schema(&schema_name, schema_encoding, schema_data.as_bytes())? } else { 0 }; // Use the internal add_channel method with empty metadata let empty_metadata = HashMap::new(); - self.add_channel(schema_id, topic, message_type, &empty_metadata) + self.add_channel(schema_id, topic, encoding, &empty_metadata) } fn write(&mut self, message: &RawMessage) -> Result<()> { diff --git a/src/lib.rs b/src/lib.rs index cecbbac..c615e13 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -301,3 +301,9 @@ pub trait Decoder: Send + Sync { // Python bindings (optional feature) #[cfg(feature = "python")] pub mod python; + +// Test helpers (only available when testing) +#[cfg(test)] +pub mod tests { + pub use crate::io::metadata::FileFormat; +} diff --git a/src/schema/parser/unified.rs b/src/schema/parser/unified.rs index 4fa5963..df6dad0 100644 --- a/src/schema/parser/unified.rs +++ b/src/schema/parser/unified.rs @@ -109,10 +109,40 @@ pub fn parse_schema_with_encoding( /// This adds standard ROS2 `builtin_interfaces` types like Time and Duration /// to the schema, ensuring they're available when decoding messages that /// reference them. +/// +/// When a type already exists under a different naming convention from the +/// parsed message_definition (e.g., `std_msgs/Header` with `seq` from ROS1), +/// we skip adding the `/msg/` variant to avoid ambiguous short-name resolution +/// in `get_type_variants`. The parsed definition is authoritative. fn populate_builtin_types(schema: &mut MessageSchema) { + // Snapshot the types that were parsed from the message_definition, + // BEFORE we start adding builtins. These are authoritative. + let parsed_types: std::collections::HashSet = schema.types.keys().cloned().collect(); + for builtin_type in builtin_types::get_all() { // Only add if not already present (user schemas can override) - if !schema.types.contains_key(&builtin_type.name) { + if schema.types.contains_key(&builtin_type.name) { + continue; + } + + // Skip adding a builtin variant if a naming variant was already + // PARSED from the message_definition (not from a previous builtin). + // e.g., skip std_msgs/msg/Header when std_msgs/Header was parsed + // from the ROS1 message_definition (which includes the seq field). + let parsed_variant_exists = if builtin_type.name.contains("/msg/") { + let without_msg = builtin_type.name.replace("/msg/", "/"); + parsed_types.contains(&without_msg) + } else { + let parts: Vec<&str> = builtin_type.name.rsplitn(2, '/').collect(); + if parts.len() == 2 { + let with_msg = format!("{}/msg/{}", parts[1], parts[0]); + parsed_types.contains(&with_msg) + } else { + false + } + }; + + if !parsed_variant_exists { schema.add_type(builtin_type); } } diff --git a/tests/ros1_decode_dynamic_tests.rs b/tests/ros1_decode_dynamic_tests.rs index bdf8dcf..ad115b4 100644 --- a/tests/ros1_decode_dynamic_tests.rs +++ b/tests/ros1_decode_dynamic_tests.rs @@ -319,6 +319,88 @@ fn test_decode_dynamic_sensors_data_raw_topic() { ); } +// ============================================================================ +// Targeted: /tf must decode (tf2_msgs/TFMessage with nested Header) +// ============================================================================ + +/// Ensure `/tf` (tf2_msgs/TFMessage) messages decode successfully. +/// +/// TFMessage contains a dynamic array of geometry_msgs/TransformStamped, +/// each with std_msgs/Header (seq, stamp, frame_id), child_frame_id string, +/// and Transform. This exercises nested Header resolution and ROS1 string +/// reading (no null terminator). Uses fixture `robocodec_test_24_leju_claw.bag`. +#[test] +fn test_decode_dynamic_tf_topic() { + let path = fixture_path("robocodec_test_24_leju_claw.bag"); + if !path.exists() { + eprintln!("Skipping: fixture not found at {}", path.display()); + return; + } + + let data = std::fs::read(&path).expect("Failed to read fixture bag"); + + let mut parser = StreamingBagParser::new(); + let mut all_records = Vec::new(); + + for chunk in data.chunks(64 * 1024) { + let records = parser.parse_chunk(chunk).expect("parse_chunk failed"); + all_records.extend(records); + } + + let channels = parser.channels(); + let factory = CodecFactory::new(); + let schema_cache = build_schema_cache(&channels, &factory); + + let mut tf_decoded = 0usize; + let mut tf_errors = Vec::new(); + + for record in &all_records { + let channel_id = record.conn_id as u16; + let Some(channel_info) = channels.get(&channel_id) else { + continue; + }; + + if channel_info.topic != "/tf" { + continue; + } + + let schema = schema_cache + .get(&channel_id) + .expect("Should have schema for /tf"); + + let codec = factory + .get_codec(schema.encoding()) + .expect("Should have CDR codec"); + + match codec.decode_dynamic(&record.data, schema) { + Ok(decoded) => { + tf_decoded += 1; + // TFMessage has "transforms" array + assert!(!decoded.is_empty(), "Decoded TFMessage should have fields"); + assert!( + decoded.contains_key("transforms"), + "Decoded TFMessage should have 'transforms' field" + ); + } + Err(e) => { + tf_errors.push(format!("{e}")); + } + } + } + + assert!( + tf_decoded > 0, + "Expected at least one /tf message to decode, but got 0 (errors: {:?})", + tf_errors + ); + assert!( + tf_errors.is_empty(), + "All /tf messages should decode, but {} failed: {:?}", + tf_errors.len(), + tf_errors + ); +} + // ============================================================================ // Verify ROS1 schema encoding is detected for CDR channels // ============================================================================ From f1cdc7195ac5712e33ab812c52d7b819bdbb51b4 Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Mon, 9 Feb 2026 05:43:23 +0800 Subject: [PATCH 2/5] fix: remove needless borrow in add_schema call --- src/io/formats/mcap/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/formats/mcap/writer.rs b/src/io/formats/mcap/writer.rs index e2d9d6c..c0b3ec9 100644 --- a/src/io/formats/mcap/writer.rs +++ b/src/io/formats/mcap/writer.rs @@ -1097,7 +1097,7 @@ impl FormatWriter for ParallelMcapWriter> { // Default to message encoding for backward compatibility encoding }; - self.add_schema(&schema_name, schema_encoding, schema_data.as_bytes())? + self.add_schema(schema_name, schema_encoding, schema_data.as_bytes())? } else { 0 }; From f0d00b3e3a9c517f450ed265499c90b3fa52ea7e Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Mon, 9 Feb 2026 05:45:02 +0800 Subject: [PATCH 3/5] revert: restore MCAP writer to original implementation --- src/io/formats/mcap/writer.rs | 23 +++-------------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/src/io/formats/mcap/writer.rs b/src/io/formats/mcap/writer.rs index c0b3ec9..cb9cb43 100644 --- a/src/io/formats/mcap/writer.rs +++ b/src/io/formats/mcap/writer.rs @@ -1079,32 +1079,15 @@ impl FormatWriter for ParallelMcapWriter> { ) -> Result { // Add schema if provided let schema_id = if let Some(schema_data) = schema { - // Use message_type directly as schema name (not message_type_schema) - // so the parser can find the correct type definition - let schema_name = message_type; - // Determine schema encoding based on message type - // ROS message types use "ros2msg" schema encoding with CDR message encoding - // JSON message types use "jsonschema" schema encoding with JSON message encoding - let schema_encoding = if encoding == "cdr" - && (message_type.contains('/') || message_type.contains("msg")) - { - "ros2msg" - } else if encoding == "json" { - "jsonschema" - } else if encoding == "protobuf" { - "protobuf" - } else { - // Default to message encoding for backward compatibility - encoding - }; - self.add_schema(schema_name, schema_encoding, schema_data.as_bytes())? + let schema_name = format!("{message_type}_schema"); + self.add_schema(&schema_name, encoding, schema_data.as_bytes())? } else { 0 }; // Use the internal add_channel method with empty metadata let empty_metadata = HashMap::new(); - self.add_channel(schema_id, topic, encoding, &empty_metadata) + self.add_channel(schema_id, topic, message_type, &empty_metadata) } fn write(&mut self, message: &RawMessage) -> Result<()> { From 019baae541f1cb1d89439e9940d5fbef98f38898 Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Mon, 9 Feb 2026 05:45:22 +0800 Subject: [PATCH 4/5] revert: remove test helper export from lib.rs --- src/lib.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c615e13..cecbbac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -301,9 +301,3 @@ pub trait Decoder: Send + Sync { // Python bindings (optional feature) #[cfg(feature = "python")] pub mod python; - -// Test helpers (only available when testing) -#[cfg(test)] -pub mod tests { - pub use crate::io::metadata::FileFormat; -} From bf1d8d652d63c24221f459f8065e160212be573e Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Mon, 9 Feb 2026 09:15:41 +0800 Subject: [PATCH 5/5] adding helper function --- tests/ros1_decode_dynamic_tests.rs | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/tests/ros1_decode_dynamic_tests.rs b/tests/ros1_decode_dynamic_tests.rs index ad115b4..9e9ad4a 100644 --- a/tests/ros1_decode_dynamic_tests.rs +++ b/tests/ros1_decode_dynamic_tests.rs @@ -33,6 +33,21 @@ fn fixture_path(name: &str) -> PathBuf { path } +/// Path to the ROS1 bag used by these tests. +/// +/// If the environment variable `ROBOCODEC_ROS1_BAG_PATH` is set and that path +/// exists, it is used (e.g. a full-size production bag). Otherwise the default +/// fixture `tests/fixtures/robocodec_test_24_leju_claw.bag` is used. +fn ros1_bag_path() -> PathBuf { + if let Ok(env_path) = std::env::var("ROBOCODEC_ROS1_BAG_PATH") { + let p = PathBuf::from(&env_path); + if p.exists() { + return p; + } + } + fixture_path("robocodec_test_24_leju_claw.bag") +} + /// Build a `SchemaMetadata` cache from channel info, exactly mirroring /// the production path in `roboflow-sources/src/decode.rs`. fn build_schema_cache( @@ -74,7 +89,7 @@ fn build_schema_cache( /// path that was broken before the fix. #[test] fn test_streaming_decode_dynamic_ros1_bag() { - let path = fixture_path("robocodec_test_24_leju_claw.bag"); + let path = ros1_bag_path(); if !path.exists() { eprintln!("Skipping: fixture not found at {}", path.display()); return; @@ -171,7 +186,7 @@ fn test_streaming_decode_dynamic_ros1_bag() { /// "String length 1600351329 exceeds maximum" error before the fix. #[test] fn test_decode_dynamic_leju_claw_state_topic() { - let path = fixture_path("robocodec_test_24_leju_claw.bag"); + let path = ros1_bag_path(); if !path.exists() { eprintln!("Skipping: fixture not found at {}", path.display()); return; @@ -251,7 +266,7 @@ fn test_decode_dynamic_leju_claw_state_topic() { /// Header, exercising the same seq-field path as `/leju_claw_state`. #[test] fn test_decode_dynamic_sensors_data_raw_topic() { - let path = fixture_path("robocodec_test_24_leju_claw.bag"); + let path = ros1_bag_path(); if !path.exists() { eprintln!("Skipping: fixture not found at {}", path.display()); return; @@ -331,7 +346,7 @@ fn test_decode_dynamic_sensors_data_raw_topic() { /// reading (no null terminator). Uses fixture `robocodec_test_24_leju_claw.bag`. #[test] fn test_decode_dynamic_tf_topic() { - let path = fixture_path("robocodec_test_24_leju_claw.bag"); + let path = ros1_bag_path(); if !path.exists() { eprintln!("Skipping: fixture not found at {}", path.display()); return; @@ -407,7 +422,7 @@ fn test_decode_dynamic_tf_topic() { #[test] fn test_ros1_bag_schema_encoding_detected() { - let path = fixture_path("robocodec_test_24_leju_claw.bag"); + let path = ros1_bag_path(); if !path.exists() { eprintln!("Skipping: fixture not found at {}", path.display()); return;