Skip to content

Commit

Permalink
2024 Husarion
Browse files Browse the repository at this point in the history
  • Loading branch information
rafal-gorecki committed Jan 29, 2024
1 parent 1fb0a56 commit 84335fd
Show file tree
Hide file tree
Showing 27 changed files with 177 additions and 1,574 deletions.
35 changes: 30 additions & 5 deletions rosbot_xl_bringup/test/test_diff_drive_ekf_and_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
import launch_pytest
import pytest
import rclpy
import os
import random

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
from launch.substitutions import PathJoinSubstitution
from launch.launch_description_sources import PythonLaunchDescriptionSource
from test_utils import BringupTestNode, ekf_and_scan_test
from threading import Thread
from test_utils import BringupTestNode


@launch_pytest.fixture
def generate_test_description():
proc_env = os.environ.copy()
proc_env["ROS_LOCALHOST_ONLY"] = "1"
proc_env["ROS_DOMAIN_ID"] = random.randint(0, 255)

rosbot_xl_bringup = get_package_share_directory("rosbot_xl_bringup")
bringup_launch = IncludeLaunchDescription(
Expand All @@ -55,11 +59,32 @@ def test_bringup_startup_success():
rclpy.init()
try:
node = BringupTestNode("test_bringup")
node.create_test_subscribers_and_publishers()
node.start_publishing_fake_hardware()

Thread(target=lambda node: rclpy.spin(node), args=(node,)).start()
ekf_and_scan_test(node)
node.destroy_node()
node.start_node_thread()
msgs_received_flag = node.odom_tf_event.wait(timeout=10.0)
assert (
msgs_received_flag
), "Expected odom to base_link tf but it was not received. Check robot_localization!"

finally:
rclpy.shutdown()


@pytest.mark.launch(fixture=generate_test_description)
def test_bringup_scan_filter():
rclpy.init()
try:
node = BringupTestNode("test_bringup")
node.create_test_subscribers_and_publishers()
node.start_publishing_fake_hardware()

node.start_node_thread()
msgs_received_flag = node.scan_filter_event.wait(timeout=10.0)
assert (
msgs_received_flag
), "Expected filtered scan but it is not filtered properly. Check laser_filter!"

finally:
rclpy.shutdown()
35 changes: 30 additions & 5 deletions rosbot_xl_bringup/test/test_mecanum_ekf_and_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
import launch_pytest
import pytest
import rclpy
import os
import random

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
from launch.substitutions import PathJoinSubstitution
from launch.launch_description_sources import PythonLaunchDescriptionSource
from test_utils import BringupTestNode, ekf_and_scan_test
from threading import Thread
from test_utils import BringupTestNode


@launch_pytest.fixture
def generate_test_description():
proc_env = os.environ.copy()
proc_env["ROS_LOCALHOST_ONLY"] = "1"
proc_env["ROS_DOMAIN_ID"] = random.randint(0, 255)

rosbot_xl_bringup = get_package_share_directory("rosbot_xl_bringup")
bringup_launch = IncludeLaunchDescription(
Expand All @@ -55,11 +59,32 @@ def test_bringup_startup_success():
rclpy.init()
try:
node = BringupTestNode("test_bringup")
node.create_test_subscribers_and_publishers()
node.start_publishing_fake_hardware()

Thread(target=lambda node: rclpy.spin(node), args=(node,)).start()
ekf_and_scan_test(node)
node.destroy_node()
node.start_node_thread()
msgs_received_flag = node.odom_tf_event.wait(timeout=10.0)
assert (
msgs_received_flag
), "Expected odom to base_link tf but it was not received. Check robot_localization!"

finally:
rclpy.shutdown()


@pytest.mark.launch(fixture=generate_test_description)
def test_bringup_scan_filter():
rclpy.init()
try:
node = BringupTestNode("test_bringup")
node.create_test_subscribers_and_publishers()
node.start_publishing_fake_hardware()

node.start_node_thread()
msgs_received_flag = node.scan_filter_event.wait(timeout=10.0)
assert (
msgs_received_flag
), "Expected filtered scan but it is not filtered properly. Check laser_filter!"

finally:
rclpy.shutdown()
57 changes: 36 additions & 21 deletions rosbot_xl_bringup/test/test_multirobot_ekf_and_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,25 @@
import launch_pytest
import pytest
import rclpy
import os
import random

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
from launch.substitutions import PathJoinSubstitution
from launch.launch_description_sources import PythonLaunchDescriptionSource
from rclpy.executors import SingleThreadedExecutor
from test_utils import BringupTestNode, ekf_and_scan_test
from threading import Thread
from test_utils import BringupTestNode


robot_names = ["rosbot1", "rosbot2", "rosbot3"]


@launch_pytest.fixture
def generate_test_description():
proc_env = os.environ.copy()
proc_env["ROS_LOCALHOST_ONLY"] = "1"
proc_env["ROS_DOMAIN_ID"] = random.randint(0, 255)

rosbot_xl_bringup = get_package_share_directory("rosbot_xl_bringup")
actions = []
Expand Down Expand Up @@ -60,25 +63,37 @@ def generate_test_description():

@pytest.mark.launch(fixture=generate_test_description)
def test_namespaced_bringup_startup_success():
rclpy.init()
try:
nodes = {}
executor = SingleThreadedExecutor()

for node_name in robot_names:
node = BringupTestNode("test_bringup", namespace=node_name)
for robot_name in robot_names:
rclpy.init()
try:
node = BringupTestNode("test_bringup", namespace=robot_name)
node.create_test_subscribers_and_publishers()
node.start_publishing_fake_hardware()
nodes[node_name] = node
executor.add_node(node)

Thread(target=lambda executor: executor.spin(), args=(executor,)).start()
node.start_node_thread()
msgs_received_flag = node.odom_tf_event.wait(timeout=30.0)
assert (
msgs_received_flag
), "Expected odom to base_link tf but it was not received. Check robot_localization!"

finally:
rclpy.shutdown()


@pytest.mark.launch(fixture=generate_test_description)
def test_namespaced_bringup_scan_filter():
for robot_name in robot_names:
rclpy.init()
try:
node = BringupTestNode("test_bringup", namespace=robot_name)
node.create_test_subscribers_and_publishers()
node.start_publishing_fake_hardware()

for node_name in robot_names:
node = nodes[node_name]
ekf_and_scan_test(node, node_name)
executor.remove_node(node)
node.destroy_node()
node.start_node_thread()
msgs_received_flag = node.scan_filter_event.wait(timeout=30.0)
assert (
msgs_received_flag
), "Expected filtered scan but it is not filtered properly. Check laser_filter!"

finally:
executor.shutdown()
rclpy.shutdown()
finally:
rclpy.shutdown()
39 changes: 32 additions & 7 deletions rosbot_xl_bringup/test/test_namespaced_diff_drive_ekf_and_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
import launch_pytest
import pytest
import rclpy
import os
import random

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
from launch.substitutions import PathJoinSubstitution
from launch.launch_description_sources import PythonLaunchDescriptionSource
from test_utils import BringupTestNode, ekf_and_scan_test
from threading import Thread
from test_utils import BringupTestNode


@launch_pytest.fixture
def generate_test_description():
proc_env = os.environ.copy()
proc_env["ROS_LOCALHOST_ONLY"] = "1"
proc_env["ROS_DOMAIN_ID"] = random.randint(0, 255)

rosbot_xl_bringup = get_package_share_directory("rosbot_xl_bringup")
bringup_launch = IncludeLaunchDescription(
Expand All @@ -44,7 +48,7 @@ def generate_test_description():
launch_arguments={
"use_sim": "False",
"mecanum": "False",
"namespace": "rosbot_xl",
"namespace": "rosbotxl",
}.items(),
)

Expand All @@ -55,12 +59,33 @@ def generate_test_description():
def test_namespaced_bringup_startup_success():
rclpy.init()
try:
node = BringupTestNode("test_bringup", namespace="rosbot_xl")
node = BringupTestNode("test_bringup", namespace="rosbotxl")
node.create_test_subscribers_and_publishers()
node.start_publishing_fake_hardware()

Thread(target=lambda node: rclpy.spin(node), args=(node,)).start()
ekf_and_scan_test(node)
node.destroy_node()
node.start_node_thread()
msgs_received_flag = node.odom_tf_event.wait(timeout=10.0)
assert (
msgs_received_flag
), "Expected odom to base_link tf but it was not received. Check robot_localization!"

finally:
rclpy.shutdown()


@pytest.mark.launch(fixture=generate_test_description)
def test_namespaced_bringup_scan_filter():
rclpy.init()
try:
node = BringupTestNode("test_bringup", namespace="rosbotxl")
node.create_test_subscribers_and_publishers()
node.start_publishing_fake_hardware()

node.start_node_thread()
msgs_received_flag = node.scan_filter_event.wait(timeout=10.0)
assert (
msgs_received_flag
), "Expected filtered scan but it is not filtered properly. Check laser_filter!"

finally:
rclpy.shutdown()
39 changes: 32 additions & 7 deletions rosbot_xl_bringup/test/test_namespaced_mecanum_ekf_and_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
import launch_pytest
import pytest
import rclpy
import os
import random

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
from launch.substitutions import PathJoinSubstitution
from launch.launch_description_sources import PythonLaunchDescriptionSource
from test_utils import BringupTestNode, ekf_and_scan_test
from threading import Thread
from test_utils import BringupTestNode


@launch_pytest.fixture
def generate_test_description():
proc_env = os.environ.copy()
proc_env["ROS_LOCALHOST_ONLY"] = "1"
proc_env["ROS_DOMAIN_ID"] = random.randint(0, 255)

rosbot_xl_bringup = get_package_share_directory("rosbot_xl_bringup")
bringup_launch = IncludeLaunchDescription(
Expand All @@ -44,7 +48,7 @@ def generate_test_description():
launch_arguments={
"use_sim": "False",
"mecanum": "True",
"namespace": "rosbot_xl",
"namespace": "rosbotxl",
}.items(),
)

Expand All @@ -55,12 +59,33 @@ def generate_test_description():
def test_namespaced_bringup_startup_success():
rclpy.init()
try:
node = BringupTestNode("test_bringup", namespace="rosbot_xl")
node = BringupTestNode("test_bringup", namespace="rosbotxl")
node.create_test_subscribers_and_publishers()
node.start_publishing_fake_hardware()

Thread(target=lambda node: rclpy.spin(node), args=(node,)).start()
ekf_and_scan_test(node)
node.destroy_node()
node.start_node_thread()
msgs_received_flag = node.odom_tf_event.wait(timeout=10.0)
assert (
msgs_received_flag
), "Expected odom to base_link tf but it was not received. Check robot_localization!"

finally:
rclpy.shutdown()


@pytest.mark.launch(fixture=generate_test_description)
def test_namespaced_bringup_scan_filter():
rclpy.init()
try:
node = BringupTestNode("test_bringup", namespace="rosbotxl")
node.create_test_subscribers_and_publishers()
node.start_publishing_fake_hardware()

node.start_node_thread()
msgs_received_flag = node.scan_filter_event.wait(timeout=10.0)
assert (
msgs_received_flag
), "Expected filtered scan but it is not filtered properly. Check laser_filter!"

finally:
rclpy.shutdown()
Loading

0 comments on commit 84335fd

Please sign in to comment.