From f857c5d8188fa6703c4bf07f19f8ecbeede99ce3 Mon Sep 17 00:00:00 2001 From: Sebastian Castro Date: Tue, 31 Oct 2023 16:10:03 -0400 Subject: [PATCH] More actions functionality and tests --- .../capabilities/action_result.py | 32 ++++ .../capabilities/advertise_action.py | 56 +++++- .../capabilities/send_action_goal.py | 173 ++++++++++++++++++ .../src/rosbridge_library/internal/actions.py | 89 +++++---- .../capabilities/test_action_capabilities.py | 71 ++++++- .../test/internal/actions/test_actions.py | 14 +- 6 files changed, 386 insertions(+), 49 deletions(-) create mode 100644 rosbridge_library/src/rosbridge_library/capabilities/send_action_goal.py diff --git a/rosbridge_library/src/rosbridge_library/capabilities/action_result.py b/rosbridge_library/src/rosbridge_library/capabilities/action_result.py index 6fc979c7d..cd8a61162 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/action_result.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/action_result.py @@ -1,3 +1,35 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2023, PickNik Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + from rosbridge_library.capability import Capability from rosbridge_library.internal import message_conversion, ros_loader diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py index 0ab281c0d..76aaf898f 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py @@ -1,4 +1,37 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2023, PickNik Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + import fnmatch +import time import rclpy from rclpy.action import ActionServer @@ -12,11 +45,12 @@ class AdvertisedActionHandler: id_counter = 1 - def __init__(self, action_name, action_type, protocol): + def __init__(self, action_name, action_type, protocol, sleep_time=0.001): self.goal_futures = {} self.action_name = action_name self.action_type = action_type self.protocol = protocol + self.sleep_time = sleep_time # setup the action self.action_server = ActionServer( protocol.node_handle, @@ -31,26 +65,30 @@ def next_id(self): self.id_counter += 1 return id - async def execute_callback(self, goal): + def execute_callback(self, goal): # generate a unique ID - goal_id = f"action_goal:{self.action}:{self.next_id()}" + goal_id = f"action_goal:{self.action_name}:{self.next_id()}" future = rclpy.task.Future() - self.request_futures[goal_id] = future + self.goal_futures[goal_id] = future # build a request to send to the external client goal_message = { "op": "send_action_goal", "id": goal_id, "action": self.action_name, - "args": message_conversion.extract_values(goal), + "action_type": self.action_type, + "args": message_conversion.extract_values(goal.request), } self.protocol.send(goal_message) - try: - return await future - finally: - del self.goal_futures[goal_id] + while not future.done(): + time.sleep(self.sleep_time) + + result = future.result() + goal.succeed() + del self.goal_futures[goal_id] + return result def handle_result(self, goal_id, res): """ diff --git a/rosbridge_library/src/rosbridge_library/capabilities/send_action_goal.py b/rosbridge_library/src/rosbridge_library/capabilities/send_action_goal.py new file mode 100644 index 000000000..4175bf6e3 --- /dev/null +++ b/rosbridge_library/src/rosbridge_library/capabilities/send_action_goal.py @@ -0,0 +1,173 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2023, PickNik Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import fnmatch +from functools import partial +from threading import Thread + +from rosbridge_library.capability import Capability +from rosbridge_library.internal.actions import ActionClientHandler + + +class SendActionGoal(Capability): + + send_action_goal_msg_fields = [ + (True, "action", str), + (True, "action_type", str), + (False, "fragment_size", (int, type(None))), + (False, "compression", str), + ] + + actions_glob = None + + def __init__(self, protocol): + # Call superclass constructor + Capability.__init__(self, protocol) + + # Register the operations that this capability provides + call_services_in_new_thread = ( + protocol.node_handle.get_parameter("call_services_in_new_thread") + .get_parameter_value() + .bool_value + ) + if call_services_in_new_thread: + # Sends the action goal in a separate thread so multiple actions can be processed simultaneously. + protocol.node_handle.get_logger().info("Sending action goal in new thread") + protocol.register_operation( + "send_action_goal", + lambda msg: Thread(target=self.send_action_goal, args=(msg,)).start(), + ) + else: + # Sends the actions goal in this thread, so actions block and must be processed sequentially. + protocol.node_handle.get_logger().info("Sending action goal in existing thread") + protocol.register_operation("send_action_goal", self.send_action_goal) + + def send_action_goal(self, message): + # Pull out the ID + cid = message.get("id", None) + + # Typecheck the args + self.basic_type_check(message, self.send_action_goal_msg_fields) + + # Extract the args + action = message["action"] + action_type = message["action_type"] + fragment_size = message.get("fragment_size", None) + compression = message.get("compression", "none") + args = message.get("args", []) + + if SendActionGoal.actions_glob is not None and SendActionGoal.actions_glob: + self.protocol.log("debug", f"Action security glob enabled, checking action: {action}") + match = False + for glob in SendActionGoal.actions_glob: + if fnmatch.fnmatch(action, glob): + self.protocol.log( + "debug", + f"Found match with glob {glob}, continuing sending action goal...", + ) + match = True + break + if not match: + self.protocol.log( + "warn", + f"No match found for action, cancelling sending action goal for: {action}", + ) + return + else: + self.protocol.log("debug", "No action security glob, not checking sending action goal.") + + # Check for deprecated action ID, eg. /rosbridge/topics#33 + cid = extract_id(action, cid) + + # Create the callbacks + s_cb = partial(self._success, cid, action, fragment_size, compression) + e_cb = partial(self._failure, cid, action) + feedback = True # TODO: Implement + if feedback: + f_cb = partial(self._feedback, cid, action) + else: + f_cb = None + + # Run action client handler in the same thread. + ActionClientHandler( + trim_action_name(action), action_type, args, s_cb, e_cb, f_cb, self.protocol.node_handle + ).run() + + def _success(self, cid, action, fragment_size, compression, message): + outgoing_message = { + "op": "action_result", + "action": action, + "values": message, + "result": True, + } + if cid is not None: + outgoing_message["id"] = cid + # TODO: fragmentation, compression + self.protocol.send(outgoing_message) + + def _failure(self, cid, action, exc): + self.protocol.log("error", "send_action_goal %s: %s" % (type(exc).__name__, str(exc)), cid) + # send response with result: false + outgoing_message = { + "op": "action_result", + "service": action, + "values": str(exc), + "result": False, + } + if cid is not None: + outgoing_message["id"] = cid + self.protocol.send(outgoing_message) + + def _feedback(self, cid, action, message): + outgoing_message = { + "op": "action_feedback", + "action": action, + "values": message, + } + if cid is not None: + outgoing_message["id"] = cid + # TODO: fragmentation, compression + print(outgoing_message) + self.protocol.send(outgoing_message) + + +def trim_action_name(action): + if "#" in action: + return action[: action.find("#")] + return action + + +def extract_id(action, cid): + if cid is not None: + return cid + elif "#" in action: + return action[action.find("#") + 1 :] diff --git a/rosbridge_library/src/rosbridge_library/internal/actions.py b/rosbridge_library/src/rosbridge_library/internal/actions.py index db4943a72..a94acefe6 100644 --- a/rosbridge_library/src/rosbridge_library/internal/actions.py +++ b/rosbridge_library/src/rosbridge_library/internal/actions.py @@ -33,7 +33,6 @@ import time from threading import Thread -import rclpy from rclpy.action import ActionClient from rclpy.expand_topic_name import expand_topic_name from rosbridge_library.internal.message_conversion import ( @@ -52,7 +51,16 @@ def __init__(self, action_name): class ActionClientHandler(Thread): - def __init__(self, action, action_type, args, success_callback, error_callback, node_handle): + def __init__( + self, + action, + action_type, + args, + success_callback, + error_callback, + feedback_callback, + node_handle, + ): """ Create a client handler for the specified action. Use start() to start in a separate thread or run() to run in this thread. @@ -76,12 +84,21 @@ def __init__(self, action, action_type, args, success_callback, error_callback, self.args = args self.success = success_callback self.error = error_callback + self.feedback = feedback_callback self.node_handle = node_handle def run(self): try: # Call the service and pass the result to the success handler - self.success(send_goal(self.node_handle, self.action, self.action_type, args=self.args)) + self.success( + SendGoal().send_goal( + self.node_handle, + self.action, + self.action_type, + args=self.args, + feedback_cb=self.feedback, + ) + ) except Exception as e: # On error, just pass the exception to the error handler self.error(e) @@ -105,31 +122,41 @@ def args_to_action_goal_instance(action, inst, args): populate_instance(msg, inst) -def send_goal(node_handle, action, action_type, args=None, sleep_time=0.001): - # Given the action nam and type, fetch a request instance - action_name = expand_topic_name(action, node_handle.get_name(), node_handle.get_namespace()) - action_class = get_action_class(action_type) - inst = get_action_goal_instance(action_type) - - # Populate the instance with the provided args - args_to_action_goal_instance(action_name, inst, args) - - client = ActionClient(node_handle, action_class, action_name) - send_goal_future = client.send_goal_async(inst) - while rclpy.ok() and not send_goal_future.done(): - time.sleep(sleep_time) - goal_handle = send_goal_future.result() - - if not goal_handle.accepted: - raise Exception("Action goal was rejected") # TODO: Catch better - - result = goal_handle.get_result() - client.destroy() - - if result is not None: - # Turn the response into JSON and pass to the callback - json_response = extract_values(result) - else: - raise Exception(result) - - return json_response +class SendGoal: + def get_result_cb(self, future): + self.result = future.result() + + def goal_response_cb(self, future): + goal_handle = future.result() + if not goal_handle.accepted: + raise Exception("Action goal was rejected") + result_future = goal_handle.get_result_async() + result_future.add_done_callback(self.get_result_cb) + + def send_goal( + self, node_handle, action, action_type, args=None, feedback_cb=None, sleep_time=0.001 + ): + # Given the action nam and type, fetch a request instance + action_name = expand_topic_name(action, node_handle.get_name(), node_handle.get_namespace()) + action_class = get_action_class(action_type) + inst = get_action_goal_instance(action_type) + + # Populate the instance with the provided args + args_to_action_goal_instance(action_name, inst, args) + + self.result = None + client = ActionClient(node_handle, action_class, action_name) + send_goal_future = client.send_goal_async(inst, feedback_callback=feedback_cb) + send_goal_future.add_done_callback(self.goal_response_cb) + + while self.result is None: + time.sleep(sleep_time) + + client.destroy() + if self.result is not None: + # Turn the response into JSON and pass to the callback + json_response = extract_values(self.result) + else: + raise Exception(self.result) + + return json_response diff --git a/rosbridge_library/test/capabilities/test_action_capabilities.py b/rosbridge_library/test/capabilities/test_action_capabilities.py index acd033301..7e72ca8da 100755 --- a/rosbridge_library/test/capabilities/test_action_capabilities.py +++ b/rosbridge_library/test/capabilities/test_action_capabilities.py @@ -1,11 +1,15 @@ #!/usr/bin/env python +import time import unittest from json import dumps, loads +from threading import Thread import rclpy +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rosbridge_library.capabilities.action_result import ActionResult from rosbridge_library.capabilities.advertise_action import AdvertiseAction +from rosbridge_library.capabilities.send_action_goal import SendActionGoal from rosbridge_library.internal.exceptions import ( InvalidArgumentException, MissingArgumentException, @@ -16,7 +20,9 @@ class TestActionCapabilities(unittest.TestCase): def setUp(self): rclpy.init() + self.executor = SingleThreadedExecutor() self.node = Node("test_action_capabilities") + self.executor.add_node(self.node) self.node.declare_parameter("call_services_in_new_thread", False) @@ -29,15 +35,20 @@ def setUp(self): self.advertise = AdvertiseAction(self.proto) # self.unadvertise = UnadvertiseService(self.proto) self.result = ActionResult(self.proto) - # self.call_service = CallService(self.proto) + self.send_goal = SendActionGoal(self.proto) self.received_message = None self.log_entries = [] + self.exec_thread = Thread(target=self.executor.spin) + self.exec_thread.start() + def tearDown(self): - self.node.destroy_node() + self.executor.remove_node(self.node) + self.executor.shutdown() rclpy.shutdown() def local_send_cb(self, msg): + print(f"Received: {msg}") self.received_message = msg def mock_log(self, loglevel, message, _=None): @@ -91,8 +102,62 @@ def test_execute_advertised_action(self): ) self.received_message = None self.advertise.advertise_action(advertise_msg) + # rclpy.spin_once(self.node, timeout_sec=0.1) + + # Send a goal to the advertised action using rosbridge + self.received_message = None + goal_msg = loads( + dumps( + { + "op": "call_service", + "id": "foo", + "action": action_path, + "action_type": "example_interfaces/Fibonacci", + "args": {"order": 5}, + } + ) + ) + Thread(target=self.send_goal.send_action_goal, args=(goal_msg,)).start() + + loop_iterations = 0 + while self.received_message is None: + time.sleep(0.5) + loop_iterations += 1 + if loop_iterations > 3: + self.fail("Timed out waiting for action goal message.") - # TODO: Fill out the rest + self.assertIsNotNone(self.received_message) + self.assertTrue("op" in self.received_message) + self.assertTrue(self.received_message["op"] == "send_action_goal") + self.assertTrue("id" in self.received_message) + + # TODO: Send feedback + + # Now send the result + result_msg = loads( + dumps( + { + "op": "action_result", + "action": action_path, + "id": self.received_message["id"], + "values": {"sequence": [1, 1, 2, 3, 5]}, + "result": True, + } + ) + ) + self.received_message = None + self.result.action_result(result_msg) + + loop_iterations = 0 + while self.received_message is None: + time.sleep(0.5) + loop_iterations += 1 + if loop_iterations > 3: + self.fail("Timed out waiting for action result message.") + + self.assertIsNotNone(self.received_message) + self.assertEqual(self.received_message["op"], "action_result") + self.assertEqual(self.received_message["values"]["result"]["sequence"], [1, 1, 2, 3, 5]) if __name__ == "__main__": diff --git a/rosbridge_library/test/internal/actions/test_actions.py b/rosbridge_library/test/internal/actions/test_actions.py index 81007d718..dea523e3b 100755 --- a/rosbridge_library/test/internal/actions/test_actions.py +++ b/rosbridge_library/test/internal/actions/test_actions.py @@ -149,7 +149,7 @@ def get_result_callback(future): self.assertEqual(list(self.result.sequence), [0, 1, 1, 2, 3, 5]) # Now, call using the services - json_ret = actions.send_goal( + json_ret = actions.SendGoal().send_goal( self.node, "get_fibonacci_sequence", "example_interfaces/Fibonacci", @@ -161,28 +161,30 @@ def test_action_client_handler(self): """Same as test_service_call but via the thread caller""" ActionTester(self.executor) - rcvd = {"json": None} + received = {"json": None} def success(json): - rcvd["json"] = json + received["json"] = json def error(): raise Exception() # Now, call using the services + order = 5 actions.ActionClientHandler( "get_fibonacci_sequence", "example_interfaces/Fibonacci", - {"order": 5}, + {"order": order}, success, error, + None, # No feedback self.node, ).start() time.sleep(1.0) - self.assertIsNotNone(rcvd["json"]) - self.assertEqual(list(rcvd["json"]["result"]["sequence"]), [0, 1, 1, 2, 3, 5]) + self.assertIsNotNone(received["json"]) + self.assertEqual(list(received["json"]["result"]["sequence"]), [0, 1, 1, 2, 3, 5]) if __name__ == "__main__":