Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion artefacts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ jobs:
- name: pose_based_waypoint_mission_test
params:
policy:
- stumbling
- baseline
- stumbling
metrics: metrics.json
run: "uv run dataflow --test-waypoint-poses"

1 change: 1 addition & 0 deletions dataflow/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"dora-rs-cli>=0.3.13",
"typer>=0.20.0",
"artefacts-toolkit>=0.7.1",
"pytest-custom-exit-code>=0.3.0",
]

[project.scripts]
Expand Down
12 changes: 11 additions & 1 deletion dataflow/src/dataflow/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,21 @@ def run_dataflow(
junit_xml_path = (
output_path / ".." / "tests_junit.xml"
) # Save junit in the root outputs folder

tester = dataflow.add_node(
id="tester",
path="pytest",
args=f"{nodes_path / 'tester/tester' / test} -s --junit-xml={str(junit_xml_path)}",
args=" ".join(
[
f"{nodes_path / 'tester/tester' / test} -s",
f"--junit-xml={str(junit_xml_path)}",
# Avoid failing the dataflow on test failures
# https://docs.pytest.org/en/stable/reference/exit-codes.html
"--suppress-tests-failed-exit-code",
]
),
)

tester.add_input("waypoints", "simulation/waypoints")
tester.add_input("scene_info", "simulation/scene_info")
tester.add_input("robot_pose", "simulation/robot_pose")
Expand Down
4 changes: 4 additions & 0 deletions msgs/src/msgs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def now(cls) -> "Timestamp":
def float_seconds(self) -> float:
return self.seconds + self.nanoseconds / 1e9

@property
def float_milliseconds(self) -> float:
return self.float_seconds * 1e3


@dataclass
class Transform(ArrowMessage):
Expand Down
27 changes: 27 additions & 0 deletions nodes/tester/tester/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# noqa: D100
import os
from pathlib import Path

import msgs
import pyarrow as pa
import pytest
Expand Down Expand Up @@ -74,3 +77,27 @@ def node(request, session_node: TestNode):
session_node.set_timeout(clock_timeout.args[0])
yield session_node
session_node.reset_timeout()


@pytest.fixture(scope="session")
def metrics():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually thats a good point. Given we now have better support for usign metrics.json maybe we need a toolkit helper to actually help make / populate it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be nice! Right now, i only add metrics from the test file, but mainly because it would need a more complex util to update metrics safely from multiple processes. It would be great if the toolkit could help with that.

"""Collect test metrics during the test session."""
metrics = {}

yield metrics

workspace_path = Path(__file__).parent.parent.parent.parent
metrics_path = (
Path(
os.getenv(
"ARTEFACTS_SCENARIO_UPLOAD_DIR", workspace_path / "outputs/artefacts"
)
)
/ ".."
/ "metrics.json"
)
metrics_path.parent.mkdir(parents=True, exist_ok=True)
with open(metrics_path, "w") as f:
import json

json.dump(metrics, f, indent=2)
52 changes: 52 additions & 0 deletions nodes/tester/tester/stuck_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Helper class to detect if the robot is stuck."""

import msgs


class StuckDetector:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this is relatively generic? (toolkit?)

Copy link
Member Author

@azazdeaz azazdeaz Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this, but probably every use-case is different enough so you cant use the same thing unless we make it complex with a lot of parameters. Might be better to have recepie-book or something with testing tricks like this.

"""Helper class to detect if the robot is stuck."""

def __init__(self, threshold: float = 0.1, max_no_progress_time: float = 3.0):
"""Initialize the stuck detector.

Args:
threshold: Distance threshold in meters to consider as progress.
max_no_progress_time: Maximum time in seconds without progress before
considering the robot as stuck.

"""
self.threshold = threshold
self.max_no_progress_time = max_no_progress_time
self.last_position = None
self.last_progress_time = None

def step_is_stuck(self, position: msgs.Transform, current_time: float) -> bool:
"""Update the detector with the current position.

Returns True if the robot is considered stuck.
"""
if self.last_position is None or self.last_progress_time is None:
self.last_position = position
self.last_progress_time = current_time
return False

distance_moved = (
(position.x - self.last_position.x) ** 2
+ (position.y - self.last_position.y) ** 2
+ (position.z - self.last_position.z) ** 2
) ** 0.5

if distance_moved >= self.threshold:
self.last_position = position
self.last_progress_time = current_time
return False
else:
if current_time - self.last_progress_time > self.max_no_progress_time:
print(
"Robot is stuck: no significant movement detected."
f" Distance moved: {distance_moved:.3f} m in"
f" {current_time - self.last_progress_time:.2f} s."
)
return True

return False
48 changes: 39 additions & 9 deletions nodes/tester/tester/test_waypoints_poses.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"""Dora node that test waypoint mission completions using the robot pose."""

import time

import msgs
import pytest

from tester.stuck_detector import StuckDetector
from tester.transforms import Transforms


Expand All @@ -20,22 +19,28 @@ def test_receives_scene_info_on_startup(node):

@pytest.mark.parametrize("difficulty", [0.1, 0.7, 1.1])
@pytest.mark.clock_timeout(30)
def test_completes_waypoint_mission_with_variable_height_steps(node, difficulty: float):
def test_completes_waypoint_mission_with_variable_height_steps(
node, difficulty: float, metrics: dict
):
"""Test that the waypoint mission completes successfully.

The pyramid steps height is configured via difficulty.
"""
run_waypoint_mission_test(node, scene="generated_pyramid", difficulty=difficulty)
run_waypoint_mission_test(
node, scene="generated_pyramid", difficulty=difficulty, metrics=metrics
)


@pytest.mark.parametrize("scene", ["rail_blocks", "stone_stairs", "excavator"])
@pytest.mark.clock_timeout(30)
def test_completes_waypoint_mission_in_photo_realistic_env(node, scene: str):
def test_completes_waypoint_mission_in_photo_realistic_env(
node, scene: str, metrics: dict
):
"""Test that the waypoint mission completes successfully."""
run_waypoint_mission_test(node, scene, difficulty=1.0)
run_waypoint_mission_test(node, scene, difficulty=1.0, metrics=metrics)


def run_waypoint_mission_test(node, scene: str, difficulty: float):
def run_waypoint_mission_test(node, scene: str, difficulty: float, metrics: dict):
"""Run the waypoint mission test."""
transforms = Transforms()
node.send_output(
Expand All @@ -45,7 +50,22 @@ def run_waypoint_mission_test(node, scene: str, difficulty: float):
waypoint_list: list[str] = []
next_waypoint_index = 0

metrics_key = f"completion_time.{scene}_{difficulty}"
start_time_ms = None
current_time_ms = None
stuck_detector = StuckDetector(max_no_progress_time=5000) # 5 seconds

for event in node:
if event["type"] == "INPUT" and event["id"] == "clock":
now = msgs.Timestamp.from_arrow(event["value"]).float_milliseconds
if start_time_ms is None or now < start_time_ms:
start_time_ms = now
current_time_ms = now

# Bail if we haven't started yet
if current_time_ms is None:
continue

if event["type"] == "INPUT" and event["id"] == "waypoints":
waypoint_list_msg = msgs.WaypointList.from_arrow(event["value"])
waypoints = waypoint_list_msg.waypoints
Expand All @@ -60,7 +80,7 @@ def run_waypoint_mission_test(node, scene: str, difficulty: float):

transforms.add_transform(
wp.transform,
int(time.time()),
int(current_time_ms),
parent_frame="world",
child_frame=waypoint_frame,
)
Expand All @@ -71,7 +91,7 @@ def run_waypoint_mission_test(node, scene: str, difficulty: float):
continue

transform = msgs.Transform.from_arrow(event["value"])
timestamp = int(time.time())
timestamp = int(current_time_ms)
transforms.add_transform(
transform,
timestamp,
Expand All @@ -95,3 +115,13 @@ def run_waypoint_mission_test(node, scene: str, difficulty: float):
else:
print("All waypoints completed!")
break

if stuck_detector.step_is_stuck(transform, timestamp):
raise AssertionError(
"Robot is stuck and cannot complete the waypoint mission."
)

if start_time_ms is not None and current_time_ms is not None:
elapsed_time_ms = current_time_ms - start_time_ms
# Convert to seconds
metrics[metrics_key] = elapsed_time_ms / 1000.0
14 changes: 14 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.