17
17
import unittest
18
18
19
19
from action_msgs .msg import GoalStatus
20
+ from ament_index_python .packages import get_package_prefix
20
21
from geometry_msgs .msg import TransformStamped , Twist , TwistStamped
21
22
from launch import LaunchDescription
23
+ from launch .actions import (
24
+ LogInfo ,
25
+ SetEnvironmentVariable ,
26
+ )
22
27
from launch_ros .actions import Node
23
28
import launch_testing
29
+ import launch_testing .actions
30
+ import launch_testing .asserts
31
+ import launch_testing .markers
32
+ import launch_testing .util
24
33
from nav2_msgs .action import DockRobot , NavigateToPose , UndockRobot
25
34
import pytest
26
35
import rclpy
27
- import rclpy .time
28
- import rclpy .duration
29
36
from rclpy .action .client import ActionClient
30
37
from rclpy .action .server import ActionServer
38
+ import rclpy .duration
39
+ import rclpy .time
31
40
from sensor_msgs .msg import BatteryState
32
- from tf2_ros import TransformBroadcaster , Buffer , TransformListener
41
+ from tf2_ros import Buffer , TransformBroadcaster , TransformListener
33
42
34
43
35
44
# This test can be run standalone with:
36
45
# python3 -u -m pytest test_docking_server.py -s
37
46
38
47
@pytest .mark .rostest
48
+ @launch_testing .markers .keep_alive
39
49
def generate_test_description ():
40
-
41
- return LaunchDescription ([
42
- Node (
50
+ tmux_gdb_prefix = (
51
+ 'tmux split-window '
52
+ + get_package_prefix ('nav2_bringup' )
53
+ + '/lib/nav2_bringup/gdb_tmux_splitwindow.sh'
54
+ )
55
+ logme = LogInfo (msg = f'tmux_gdb_prefix={ tmux_gdb_prefix } ' )
56
+ docking_server = Node (
43
57
package = 'opennav_docking' ,
44
58
executable = 'opennav_docking' ,
45
59
name = 'docking_server' ,
60
+ # arguments=['--ros-args', '--log-level', 'debug'],
46
61
parameters = [{'wait_charge_timeout' : 1.0 ,
47
62
'controller' : {
48
63
'use_collision_detection' : False ,
@@ -59,17 +74,42 @@ def generate_test_description():
59
74
'pose' : [10.0 , 0.0 , 0.0 ]
60
75
}}],
61
76
output = 'screen' ,
62
- ),
63
- Node (
77
+ )
78
+ lifecycle_manager_navigation = Node (
64
79
package = 'nav2_lifecycle_manager' ,
65
80
executable = 'lifecycle_manager' ,
66
81
name = 'lifecycle_manager_navigation' ,
67
82
output = 'screen' ,
83
+ # prefix=tmux_gdb_prefix,
68
84
parameters = [{'autostart' : True },
69
85
{'node_names' : ['docking_server' ]}]
70
- ),
86
+ )
87
+
88
+ return LaunchDescription ([
89
+ SetEnvironmentVariable ('RCUTILS_LOGGING_BUFFERED_STREAM' , '1' ),
90
+ SetEnvironmentVariable ('RCUTILS_LOGGING_USE_STDOUT' , '1' ),
91
+ logme ,
92
+ docking_server ,
93
+ lifecycle_manager_navigation ,
94
+ # In tests where all of the procs under tests terminate themselves, it's necessary
95
+ # to add a dummy process not under test to keep the launch alive. launch_test
96
+ # provides a simple launch action that does this:
97
+ launch_testing .util .KeepAliveProc (),
71
98
launch_testing .actions .ReadyToTest (),
72
- ])
99
+ ]), locals ()
100
+
101
+
102
+ @launch_testing .post_shutdown_test ()
103
+ class TestPostShutdown (unittest .TestCase ):
104
+
105
+ def test_action_graceful_shutdown (self ,
106
+ proc_info ,
107
+ docking_server ,
108
+ lifecycle_manager_navigation ):
109
+ """Test graceful shutdown."""
110
+ launch_testing .asserts .assertExitCodes (proc_info , process = docking_server )
111
+ launch_testing .asserts .assertExitCodes (proc_info ,
112
+ process = lifecycle_manager_navigation )
73
113
74
114
75
115
class TestDockingServer (unittest .TestCase ):
@@ -146,7 +186,7 @@ def nav_execute_callback(self, goal_handle):
146
186
return result
147
187
148
188
def check_transform (self , timeout_sec , source_frame = 'odom' , target_frame = 'base_link' ):
149
- """Try to look up the transform we're publishing"""
189
+ """Try to look up the transform we're publishing. """
150
190
if self .transform_available :
151
191
return True
152
192
@@ -162,8 +202,8 @@ def check_transform(self, timeout_sec, source_frame='odom', target_frame='base_l
162
202
163
203
self .node .get_logger ().info ('Successfully found transform:' )
164
204
self .node .get_logger ().info (f'Translation: [{ transform .transform .translation .x } , '
165
- f'{ transform .transform .translation .y } , '
166
- f'{ transform .transform .translation .z } ]' )
205
+ f'{ transform .transform .translation .y } , '
206
+ f'{ transform .transform .translation .z } ]' )
167
207
self .transform_available = True
168
208
except Exception as e :
169
209
self .node .get_logger ().error (f'Error looking up transform: { str (e )} ' )
@@ -179,7 +219,6 @@ def test_docking_server(self):
179
219
self .tf_listener = TransformListener (self .tf_buffer , self .node )
180
220
self .transform_available = False
181
221
182
-
183
222
# Create a timer to run "control loop" at 20hz
184
223
self .timer = self .node .create_timer (0.05 , self .timer_callback )
185
224
@@ -222,7 +261,7 @@ def test_docking_server(self):
222
261
# Test docking action
223
262
self .action_result = []
224
263
assert self .dock_action_client .wait_for_server (timeout_sec = 5.0 ), \
225
- 'dock_robot service not available'
264
+ 'dock_robot service not available'
226
265
227
266
goal = DockRobot .Goal ()
228
267
goal .use_dock_id = True
0 commit comments