Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions artefacts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,25 @@ project: artefacts/go2-isaac-example
version: 0.1.0

jobs:
unit_tests:
type: test
timeout: 15 # minutes
package:
docker:
build:
dockerfile: empty
runtime:
framework: other
simulator: isaac_sim

scenarios:
settings:
- name: navigator_unit_tests
pytest_file: "nodes/navigator/tests"

- name: policy_controller_unit_tests
pytest_file: "nodes/policy_controller/tests"

waypoint_missions:
type: test
timeout: 20 # minutes
Expand All @@ -14,8 +33,6 @@ jobs:
simulator: isaac_sim

scenarios:
# Reinstall artefacts-click as a temporary workaround for conflicting click versions
# see https://github.com/art-e-fact/artefacts-client/issues/370
settings:
- name: report_based_waypoint_mission_test
run: "rm -rf outputs/artefacts && uv run dataflow --test-waypoint-report"
Expand Down
163 changes: 163 additions & 0 deletions nodes/navigator/tests/test_navigator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
"""Test module for navigator package."""

import queue
import threading
from unittest.mock import MagicMock, patch

import numpy as np
import pyarrow as pa
import pytest
from msgs import Transform, Twist2D, Waypoint, WaypointList, WaypointStatus


def test_import_main():
Expand All @@ -11,3 +18,159 @@ def test_import_main():
# as we're not running in a Dora dataflow.
with pytest.raises(RuntimeError):
main()


def test_navigator_logic():
"""Test the main navigator logic with mocked inputs."""
# Patch dora.Node in the context of the navigator.main module
with patch("navigator.main.Node") as MockNode:
mock_node_instance = MagicMock()
MockNode.return_value = mock_node_instance

# Define the inputs that the node will receive.
robot_pose = Transform.from_position_and_quaternion(
np.array([0.0, 0.0, 0.0]),
np.array([1.0, 0.0, 0.0, 0.0]), # scalar-first quaternion (w, x, y, z)
)
waypoints = WaypointList(
[
Waypoint(
transform=Transform.from_position_and_quaternion(
np.array([10.0, 0.0, 0.0]), np.array([1.0, 0.0, 0.0, 0.0])
),
status=WaypointStatus.ACTIVE,
)
]
)

# The mocked node will yield these events when iterated.
mock_node_instance.__iter__.return_value = [
{"type": "INPUT", "id": "robot_pose", "value": robot_pose.to_arrow()},
{"type": "INPUT", "id": "waypoints", "value": waypoints.to_arrow()},
{
"type": "INPUT",
"id": "tick",
"value": pa.array([]), # The tick value is not used.
},
{
"type": "INPUT",
"id": "stop", # Stop the loop
},
]

from navigator.main import main

main()

# Check that send_output was called correctly.
# The robot is at the origin, facing the goal at (10, 0).
# It should command maximum forward velocity and no angular velocity.
expected_command = Twist2D(linear_x=1.0, linear_y=0.0, angular_z=0.0)

# mock_node_instance.send_output.assert_called_once() # Fails if called more than once
mock_node_instance.send_output.assert_called_with(
"command_2d", expected_command.to_arrow()
)


def test_navigator_logic_stateful():
"""Test if sends command output after every tick input."""
# A queue to send events to the node
event_queue = queue.Queue()

with patch("navigator.main.Node") as MockNode:
mock_node_instance = MagicMock()
MockNode.return_value = mock_node_instance
mock_node_instance.__iter__.return_value = iter(event_queue.get, None)

from navigator.main import main

# Run the main function in a separate thread
main_thread = threading.Thread(target=main, daemon=True)
main_thread.start()

# Set initial robot pose and waypoints
robot_pose = Transform.from_position_and_quaternion(
np.array([0.0, 0.0, 0.0]),
np.array([1.0, 0.0, 0.0, 0.0]), # scalar-first (w, x, y, z)
)
waypoints = WaypointList(
[
Waypoint(
transform=Transform.from_position_and_quaternion(
np.array([10.0, 0.0, 0.0]), np.array([1.0, 0.0, 0.0, 0.0])
),
status=WaypointStatus.ACTIVE,
)
]
)
event_queue.put(
{"type": "INPUT", "id": "robot_pose", "value": robot_pose.to_arrow()}
)
event_queue.put(
{"type": "INPUT", "id": "waypoints", "value": waypoints.to_arrow()}
)

# Send a tick to trigger a command calculation
event_queue.put({"type": "INPUT", "id": "tick", "value": pa.array([])})

threading.Event().wait(0.1) # Wait for async operations
mock_node_instance.send_output.assert_called_once()
args, _ = mock_node_instance.send_output.call_args
assert args[0] == "command_2d"
command_output = Twist2D.from_arrow(args[1])
assert command_output.linear_x > 0, (
f"Expected to go towards X direction, got {command_output}"
)
assert command_output.angular_z == 0.0, (
f"Expected to go straight, got {command_output}"
)

# Reset mock to check for the next call
mock_node_instance.send_output.reset_mock()

# Set a new robot pose
new_robot_pose = Transform.from_position_and_quaternion(
np.array([10.0, 10.0, 0.0]),
np.array([0.707, 0.0, 0.0, 0.707]), # Rotated 90 degrees (facing +y)
)
event_queue.put(
{"type": "INPUT", "id": "robot_pose", "value": new_robot_pose.to_arrow()}
)

# Send another tick
event_queue.put({"type": "INPUT", "id": "tick", "value": pa.array([])})

# Check the new output
threading.Event().wait(0.1) # Wait for async operations
mock_node_instance.send_output.assert_called_once()
args, _ = mock_node_instance.send_output.call_args
assert args[0] == "command_2d"
command_output = Twist2D.from_arrow(args[1])
assert command_output.angular_z > 0.0, f"Expected to turn, got {command_output}"

event_queue.put(None) # Signal the iterator to end
main_thread.join(timeout=1) # Wait for the thread to finish


def test_stop_after_stop_signal():
"""Test that the navigator stops after receiving a stop signal."""
event_queue = queue.Queue()

with patch("navigator.main.Node") as MockNode:
mock_node_instance = MagicMock()
MockNode.return_value = mock_node_instance
mock_node_instance.__iter__.return_value = iter(event_queue.get, None)

from navigator.main import main

# Start the main function in a separate thread
main_thread = threading.Thread(target=main, daemon=True)
main_thread.start()

# Send stop signal
event_queue.put({"type": "INPUT", "id": "stop"})
threading.Event().wait(0.1)

# Check if the thread has finished
assert not main_thread.is_alive(), "Main thread did not stop after stop signal"
46 changes: 46 additions & 0 deletions nodes/policy_controller/tests/test_policy_controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Test module for policy_controller package."""

from unittest.mock import MagicMock, patch

import numpy as np
import pytest
from msgs import JointCommands, Observations, Timestamp, Twist2D


def test_import_main():
Expand All @@ -11,3 +15,45 @@ def test_import_main():
# as we're not running in a Dora dataflow.
with pytest.raises(RuntimeError):
main()


def test_generates_commands():
"""Check if the policy runs and generates joint commands."""
with patch("policy_controller.main.Node") as MockNode:
mock_node_instance = MagicMock()
MockNode.return_value = mock_node_instance

# Create mock inputs
command_2d = Twist2D(linear_x=0.5, linear_y=0.0, angular_z=0.0)
observations = Observations(
lin_vel=np.zeros(3),
ang_vel=np.zeros(3),
gravity=np.array([0.0, 0.0, -9.81]),
joint_positions=np.zeros(12),
joint_velocities=np.zeros(12),
height_scan=np.zeros(154),
)
clock = Timestamp.now()

# The mocked node will yield these events when iterated.
mock_node_instance.__iter__.return_value = [
{"type": "INPUT", "id": "command_2d", "value": command_2d.to_arrow()},
{"type": "INPUT", "id": "clock", "value": clock.to_arrow()},
{
"type": "INPUT",
"id": "observations",
"value": observations.to_arrow(),
},
]

from policy_controller.main import main

main()

# Check that send_output was called with joint_commands
mock_node_instance.send_output.assert_called()
args, _ = mock_node_instance.send_output.call_args
assert args[0] == "joint_commands"
joint_commands = JointCommands.from_arrow(args[1])
assert joint_commands.positions is not None
assert len(joint_commands.positions) == 12