Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/encoding/cdr/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,9 @@ impl CdrDecoder {
}

/// Read a string value (matches TS `CdrReader.string()`).
///
/// CDR/ROS2 strings: length includes null terminator, so we read `len-1` bytes + skip 1.
/// ROS1 strings: length is exact byte count (no null terminator), so we read all `len` bytes.
fn read_string(&self, cursor: &mut CdrCursor) -> CoreResult<CodecValue> {
// Read length prefix (4 bytes)
let len = cursor.read_u32()? as usize;
Expand All @@ -593,6 +596,19 @@ impl CdrDecoder {
)));
}

if cursor.is_ros1() {
// ROS1: length is exact byte count, no null terminator
if len == 0 {
return Ok(CodecValue::String(String::new()));
}
let string_bytes = cursor.read_bytes(len)?;
let s = std::str::from_utf8(string_bytes)
.map_err(|e| CodecError::parse("string utf8", format!("{e}")))?
.to_string();
return Ok(CodecValue::String(s));
}

// CDR/ROS2: length includes null terminator
if len <= 1 {
// Empty string (length 0 or 1 for just null terminator)
cursor.skip(len)?;
Expand Down
32 changes: 31 additions & 1 deletion src/schema/parser/unified.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,40 @@ pub fn parse_schema_with_encoding(
/// This adds standard ROS2 `builtin_interfaces` types like Time and Duration
/// to the schema, ensuring they're available when decoding messages that
/// reference them.
///
/// When a type already exists under a different naming convention from the
/// parsed message_definition (e.g., `std_msgs/Header` with `seq` from ROS1),
/// we skip adding the `/msg/` variant to avoid ambiguous short-name resolution
/// in `get_type_variants`. The parsed definition is authoritative.
fn populate_builtin_types(schema: &mut MessageSchema) {
// Snapshot the types that were parsed from the message_definition,
// BEFORE we start adding builtins. These are authoritative.
let parsed_types: std::collections::HashSet<String> = schema.types.keys().cloned().collect();

for builtin_type in builtin_types::get_all() {
// Only add if not already present (user schemas can override)
if !schema.types.contains_key(&builtin_type.name) {
if schema.types.contains_key(&builtin_type.name) {
continue;
}

// Skip adding a builtin variant if a naming variant was already
// PARSED from the message_definition (not from a previous builtin).
// e.g., skip std_msgs/msg/Header when std_msgs/Header was parsed
// from the ROS1 message_definition (which includes the seq field).
let parsed_variant_exists = if builtin_type.name.contains("/msg/") {
let without_msg = builtin_type.name.replace("/msg/", "/");
parsed_types.contains(&without_msg)
} else {
let parts: Vec<&str> = builtin_type.name.rsplitn(2, '/').collect();
if parts.len() == 2 {
let with_msg = format!("{}/msg/{}", parts[1], parts[0]);
parsed_types.contains(&with_msg)
} else {
false
}
};

if !parsed_variant_exists {
schema.add_type(builtin_type);
}
}
Expand Down
105 changes: 101 additions & 4 deletions tests/ros1_decode_dynamic_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ fn fixture_path(name: &str) -> PathBuf {
path
}

/// Path to the ROS1 bag used by these tests.
///
/// If the environment variable `ROBOCODEC_ROS1_BAG_PATH` is set and that path
/// exists, it is used (e.g. a full-size production bag). Otherwise the default
/// fixture `tests/fixtures/robocodec_test_24_leju_claw.bag` is used.
fn ros1_bag_path() -> PathBuf {
if let Ok(env_path) = std::env::var("ROBOCODEC_ROS1_BAG_PATH") {
let p = PathBuf::from(&env_path);
if p.exists() {
return p;
}
}
fixture_path("robocodec_test_24_leju_claw.bag")
}

/// Build a `SchemaMetadata` cache from channel info, exactly mirroring
/// the production path in `roboflow-sources/src/decode.rs`.
fn build_schema_cache(
Expand Down Expand Up @@ -74,7 +89,7 @@ fn build_schema_cache(
/// path that was broken before the fix.
#[test]
fn test_streaming_decode_dynamic_ros1_bag() {
let path = fixture_path("robocodec_test_24_leju_claw.bag");
let path = ros1_bag_path();
if !path.exists() {
eprintln!("Skipping: fixture not found at {}", path.display());
return;
Expand Down Expand Up @@ -171,7 +186,7 @@ fn test_streaming_decode_dynamic_ros1_bag() {
/// "String length 1600351329 exceeds maximum" error before the fix.
#[test]
fn test_decode_dynamic_leju_claw_state_topic() {
let path = fixture_path("robocodec_test_24_leju_claw.bag");
let path = ros1_bag_path();
if !path.exists() {
eprintln!("Skipping: fixture not found at {}", path.display());
return;
Expand Down Expand Up @@ -251,7 +266,7 @@ fn test_decode_dynamic_leju_claw_state_topic() {
/// Header, exercising the same seq-field path as `/leju_claw_state`.
#[test]
fn test_decode_dynamic_sensors_data_raw_topic() {
let path = fixture_path("robocodec_test_24_leju_claw.bag");
let path = ros1_bag_path();
if !path.exists() {
eprintln!("Skipping: fixture not found at {}", path.display());
return;
Expand Down Expand Up @@ -319,13 +334,95 @@ fn test_decode_dynamic_sensors_data_raw_topic() {
);
}

// ============================================================================
// Targeted: /tf must decode (tf2_msgs/TFMessage with nested Header)
// ============================================================================

/// Ensure `/tf` (tf2_msgs/TFMessage) messages decode successfully.
///
/// TFMessage contains a dynamic array of geometry_msgs/TransformStamped,
/// each with std_msgs/Header (seq, stamp, frame_id), child_frame_id string,
/// and Transform. This exercises nested Header resolution and ROS1 string
/// reading (no null terminator). Uses fixture `robocodec_test_24_leju_claw.bag`.
#[test]
fn test_decode_dynamic_tf_topic() {
let path = ros1_bag_path();
if !path.exists() {
eprintln!("Skipping: fixture not found at {}", path.display());
return;
}

let data = std::fs::read(&path).expect("Failed to read fixture bag");

let mut parser = StreamingBagParser::new();
let mut all_records = Vec::new();

for chunk in data.chunks(64 * 1024) {
let records = parser.parse_chunk(chunk).expect("parse_chunk failed");
all_records.extend(records);
}

let channels = parser.channels();
let factory = CodecFactory::new();
let schema_cache = build_schema_cache(&channels, &factory);

let mut tf_decoded = 0usize;
let mut tf_errors = Vec::new();

for record in &all_records {
let channel_id = record.conn_id as u16;
let Some(channel_info) = channels.get(&channel_id) else {
continue;
};

if channel_info.topic != "/tf" {
continue;
}

let schema = schema_cache
.get(&channel_id)
.expect("Should have schema for /tf");

let codec = factory
.get_codec(schema.encoding())
.expect("Should have CDR codec");

match codec.decode_dynamic(&record.data, schema) {
Ok(decoded) => {
tf_decoded += 1;
// TFMessage has "transforms" array
assert!(!decoded.is_empty(), "Decoded TFMessage should have fields");
assert!(
decoded.contains_key("transforms"),
"Decoded TFMessage should have 'transforms' field"
);
}
Err(e) => {
tf_errors.push(format!("{e}"));
}
}
}

assert!(
tf_decoded > 0,
"Expected at least one /tf message to decode, but got 0 (errors: {:?})",
tf_errors
);
assert!(
tf_errors.is_empty(),
"All /tf messages should decode, but {} failed: {:?}",
tf_errors.len(),
tf_errors
);
}

// ============================================================================
// Verify ROS1 schema encoding is detected for CDR channels
// ============================================================================

#[test]
fn test_ros1_bag_schema_encoding_detected() {
let path = fixture_path("robocodec_test_24_leju_claw.bag");
let path = ros1_bag_path();
if !path.exists() {
eprintln!("Skipping: fixture not found at {}", path.display());
return;
Expand Down