Skip to content

Commit

Permalink
Add timeout option to call_service messages (#984)
Browse files Browse the repository at this point in the history
* Add timeout option to call_service messages

* Add no service timeout option, use future done callback instead of sleep

* Remove unused imports

* Run isort

* Use event instead of condition variable for simplicity

* Fix call_advertised_service test by increasing executor spin iterations

* Spin for a specific time instead of specific number of spin iterations

* Add parameter for default call_service timeout

* Add call service timeout tests to rosbridge_library

* Add call service timeout test to rosbridge_server
  • Loading branch information
bjsowa authored Jan 20, 2025
1 parent 0af768d commit 673fc05
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 47 deletions.
4 changes: 3 additions & 1 deletion ROSBRIDGE_PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ Calls a ROS service.
"service": <string>,
(optional) "args": <list<json>>,
(optional) "fragment_size": <int>,
(optional) "compression": <string>
(optional) "compression": <string>,
(optional) "timeout": <float>
}
```

Expand All @@ -449,6 +450,7 @@ Calls a ROS service.
before it is fragmented
* **compression** – an optional string to specify the compression scheme to be
used on messages. Valid values are "none" and "png"
* **timeout** – the time, in seconds, to wait for a response from the server


Stops advertising an external ROS service server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def __init__(self, protocol):
# Call superclass constructor
Capability.__init__(self, protocol)

self.default_timeout = (
protocol.node_handle.get_parameter("default_call_service_timeout")
.get_parameter_value()
.double_value
)

# Register the operations that this capability provides
call_services_in_new_thread = (
protocol.node_handle.get_parameter("call_services_in_new_thread")
Expand Down Expand Up @@ -81,6 +87,7 @@ def call_service(self, message):
fragment_size = message.get("fragment_size", None)
compression = message.get("compression", "none")
args = message.get("args", [])
timeout = message.get("timeout", self.default_timeout)

if CallService.services_glob is not None and CallService.services_glob:
self.protocol.log(
Expand Down Expand Up @@ -112,7 +119,14 @@ def call_service(self, message):
e_cb = partial(self._failure, cid, service)

# Run service caller in the same thread.
ServiceCaller(trim_servicename(service), args, s_cb, e_cb, self.protocol.node_handle).run()
ServiceCaller(
trim_servicename(service),
args,
timeout,
s_cb,
e_cb,
self.protocol.node_handle,
).run()

def _success(self, cid, service, fragment_size, compression, message):
outgoing_message = {
Expand Down
43 changes: 32 additions & 11 deletions rosbridge_library/src/rosbridge_library/internal/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import time
from threading import Thread
from threading import Event, Thread
from typing import Any, Callable, Optional

import rclpy
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.expand_topic_name import expand_topic_name
from rclpy.node import Node
Expand All @@ -58,6 +56,7 @@ def __init__(
self,
service: str,
args: dict,
timeout: float,
success_callback: Callable[[dict], None],
error_callback: Callable[[Exception], None],
node_handle: Node,
Expand All @@ -71,6 +70,8 @@ def __init__(
ordered list, or a dict of name-value pairs. Anything else will be
treated as though no arguments were provided (which is still valid for
some kinds of service)
timeout -- the time, in seconds, to wait for a response from the server.
A non-positive value means no timeout.
success_callback -- a callback to call with the JSON result of the
service call
error_callback -- a callback to call if an error occurs. The
Expand All @@ -81,14 +82,22 @@ def __init__(
self.daemon = True
self.service = service
self.args = args
self.timeout = timeout
self.success = success_callback
self.error = error_callback
self.node_handle = node_handle

def run(self) -> None:
try:
# Call the service and pass the result to the success handler
self.success(call_service(self.node_handle, self.service, args=self.args))
self.success(
call_service(
self.node_handle,
self.service,
args=self.args,
server_response_timeout=self.timeout,
)
)
except Exception as e:
# On error, just pass the exception to the error handler
self.error(e)
Expand All @@ -114,8 +123,8 @@ def call_service(
node_handle: Node,
service: str,
args: Optional[dict] = None,
server_timeout_time: float = 1.0,
sleep_time: float = 0.001,
server_ready_timeout: float = 1.0,
server_response_timeout: float = 5.0,
) -> dict:
# Given the service name, fetch the type and class of the service,
# and a request instance
Expand All @@ -141,20 +150,32 @@ def call_service(
service_class, service, callback_group=ReentrantCallbackGroup()
)

if not client.wait_for_service(server_timeout_time):
if not client.wait_for_service(server_ready_timeout):
node_handle.destroy_client(client)
raise InvalidServiceException(service)

future = client.call_async(inst)
while rclpy.ok() and not future.done():
time.sleep(sleep_time)
result = future.result()
event = Event()

def future_done_callback():
event.set()

future.add_done_callback(lambda _: future_done_callback())

if not event.wait(timeout=(server_response_timeout if server_response_timeout > 0 else None)):
future.cancel()
node_handle.destroy_client(client)
raise Exception("Timeout exceeded while waiting for service response")

node_handle.destroy_client(client)

result = future.result()

if result is not None:
# Turn the response into JSON and pass to the callback
json_response = extract_values(result)
else:
raise Exception(result)
exception = future.exception()
raise Exception("Service call exception: " + str(exception))

return json_response
73 changes: 55 additions & 18 deletions rosbridge_library/test/capabilities/test_call_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ def trigger_cb(self, request, response):
response.message = "called trigger service successfully"
return response

def trigger_long_cb(self, request, response):
"""Helper callback function for a long running test service with no arguments."""
time.sleep(0.5)
response.success = True
response.message = "called trigger service successfully"
return response

def set_bool_cb(self, request, response):
"""Helper callback function for a test service with arguments."""
response.success = request.data
Expand All @@ -40,6 +47,7 @@ def setUp(self):
self.executor.add_node(self.node)

self.node.declare_parameter("call_services_in_new_thread", False)
self.node.declare_parameter("default_call_service_timeout", 5.0)
self.node.declare_parameter("send_action_goals_in_new_thread", False)

# Create service servers with a separate callback group
Expand All @@ -50,6 +58,12 @@ def setUp(self):
self.trigger_cb,
callback_group=self.cb_group,
)
self.trigger_long_srv = self.node.create_service(
Trigger,
self.node.get_name() + "/trigger_long",
self.trigger_long_cb,
callback_group=self.cb_group,
)
self.set_bool_srv = self.node.create_service(
SetBool,
self.node.get_name() + "/set_bool",
Expand All @@ -62,8 +76,9 @@ def setUp(self):

def tearDown(self):
self.executor.remove_node(self.node)
self.node.destroy_node()
self.executor.shutdown()
self.exec_thread.join()
self.node.destroy_node()
rclpy.shutdown()

def test_missing_arguments(self):
Expand Down Expand Up @@ -98,12 +113,6 @@ def cb(msg, cid=None, compression="none"):

s.call_service(send_msg)

timeout = 1.0
start = time.time()
while time.time() - start < timeout:
if received["arrived"]:
break

self.assertTrue(received["arrived"])
values = received["msg"]["values"]
self.assertEqual(values["success"], True)
Expand Down Expand Up @@ -135,12 +144,6 @@ def cb(msg, cid=None, compression="none"):

s.call_service(send_msg)

timeout = 1.0
start = time.time()
while time.time() - start < timeout:
if received["arrived"]:
break

self.assertTrue(received["arrived"])
values = received["msg"]["values"]
self.assertEqual(values["success"], True)
Expand Down Expand Up @@ -174,11 +177,45 @@ def cb(msg, cid=None, compression="none"):

s.call_service(send_msg)

timeout = 1.0
start = time.time()
while time.time() - start < timeout:
if received["arrived"]:
break
self.assertTrue(received["arrived"])
self.assertFalse(received["msg"]["result"])

def test_call_service_timeout(self):

client = self.node.create_client(Trigger, self.trigger_long_srv.srv_name)
assert client.wait_for_service(1.0)

proto = Protocol("test_call_service_timeout", self.node)
s = CallService(proto)
send_msg = loads(
dumps({"op": "call_service", "service": self.trigger_long_srv.srv_name, "timeout": 2.0})
)

received = {"msg": None, "arrived": False}

def cb(msg, cid=None, compression="none"):
print("Received message")
received["msg"] = msg
received["arrived"] = True

proto.send = cb

s.call_service(send_msg)

self.assertTrue(received["arrived"])
self.assertTrue(received["msg"]["result"])
values = received["msg"]["values"]
self.assertEqual(values["success"], True)
self.assertEqual(values["message"], "called trigger service successfully")

send_msg = loads(
dumps({"op": "call_service", "service": self.trigger_long_srv.srv_name, "timeout": 0.1})
)
received = {"msg": None, "arrived": False}

s.call_service(send_msg)

self.assertTrue(received["arrived"])
self.assertFalse(received["msg"]["result"])
values = received["msg"]["values"]
self.assertEqual(values, "Timeout exceeded while waiting for service response")
Loading

0 comments on commit 673fc05

Please sign in to comment.