Skip to content

Commit

Permalink
Merge branch 'ros2' into fix/subscription-queue-loop
Browse files Browse the repository at this point in the history
  • Loading branch information
sea-bass authored Dec 25, 2024
2 parents d44904b + 3b30f44 commit 5a153cc
Show file tree
Hide file tree
Showing 31 changed files with 121 additions and 78 deletions.
3 changes: 3 additions & 0 deletions rosapi/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ if(BUILD_TESTING)
find_package(ament_cmake_pytest REQUIRED)
ament_add_pytest_test(${PROJECT_NAME}_test_stringify_field_types test/test_stringify_field_types.py)
ament_add_pytest_test(${PROJECT_NAME}_test_typedefs test/test_typedefs.py)

find_package(ament_cmake_mypy REQUIRED)
ament_mypy()
endif()
1 change: 1 addition & 0 deletions rosapi/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<exec_depend>rosgraph</exec_depend>
-->

<test_depend>ament_cmake_mypy</test_depend>
<test_depend>ament_cmake_pytest</test_depend>
<test_depend>sensor_msgs</test_depend>
<test_depend>shape_msgs</test_depend>
Expand Down
3 changes: 3 additions & 0 deletions rosbridge_library/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ if (BUILD_TESTING)
find_package(ament_cmake_pytest REQUIRED)
ament_add_pytest_test(test_capabilities "test/capabilities/")
ament_add_pytest_test(test_internal "test/internal/")

find_package(ament_cmake_mypy REQUIRED)
ament_mypy()
endif()
1 change: 1 addition & 0 deletions rosbridge_library/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

<test_depend>rosbridge_test_msgs</test_depend>
<test_depend>action_msgs</test_depend>
<test_depend>ament_cmake_mypy</test_depend>
<test_depend>ament_cmake_pytest</test_depend>
<test_depend>builtin_interfaces</test_depend>
<test_depend>diagnostic_msgs</test_depend>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import fnmatch
from typing import Any

import rclpy
from action_msgs.msg import GoalStatus
from rclpy.action import ActionServer
from rclpy.action.server import CancelResponse, ServerGoalHandle
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.task import Future
from rosbridge_library.capability import Capability
from rosbridge_library.internal import message_conversion
from rosbridge_library.internal.ros_loader import get_action_class
Expand All @@ -51,9 +51,9 @@ class AdvertisedActionHandler:
def __init__(
self, action_name: str, action_type: str, protocol: Protocol, sleep_time: float = 0.001
) -> None:
self.goal_futures = {}
self.goal_handles = {}
self.goal_statuses = {}
self.goal_futures: dict[str, Future] = {}
self.goal_handles: dict[str, Any] = {}
self.goal_statuses: dict[str, GoalStatus] = {}

self.action_name = action_name
self.action_type = action_type
Expand All @@ -79,7 +79,7 @@ async def execute_callback(self, goal: Any) -> Any:
# generate a unique ID
goal_id = f"action_goal:{self.action_name}:{self.next_id()}"

def done_callback(fut: rclpy.task.Future) -> None:
def done_callback(fut: Future) -> None:
if fut.cancelled():
goal.abort()
self.protocol.log("info", f"Aborted goal {goal_id}")
Expand All @@ -94,7 +94,7 @@ def done_callback(fut: rclpy.task.Future) -> None:
else:
goal.abort()

future = rclpy.task.Future()
future: Future = Future()
future.add_done_callback(done_callback)
self.goal_handles[goal_id] = goal
self.goal_futures[goal_id] = future
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import threading
from datetime import datetime
import time

from rosbridge_library.capability import Capability

Expand Down Expand Up @@ -30,7 +30,7 @@ def spam(self):
# }
# },
# ...
lists = {}
lists: dict[str, dict] = {}

def __init__(self):
"""Create singleton instance"""
Expand Down Expand Up @@ -84,13 +84,13 @@ def __init__(self, protocol):
# 4.b) pass the reconstructed message string to protocol.incoming() # protocol.incoming is checking message fields by itself, so no need to do this before passing the reconstructed message to protocol
# 4.c) remove the fragment list to free up memory
def defragment(self, message):
now = datetime.now()
now = time.monotonic()

if self.received_fragments is not None:
for id in self.received_fragments.keys():
time_diff = now - self.received_fragments[id]["timestamp_last_append"]
if (
time_diff.total_seconds() > self.fragment_timeout
time_diff > self.fragment_timeout
and not self.received_fragments[id]["is_reconstructing"]
):
log_msg = ["fragment list ", str(id), " timed out.."]
Expand Down Expand Up @@ -188,15 +188,15 @@ def defragment(self, message):
log_msg = "".join(log_msg)
self.protocol.log("debug", log_msg)

duration = datetime.now() - now
duration = time.monotonic() - now

# Pass the reconstructed message to rosbridge
self.protocol.incoming(reconstructed_msg)
log_msg = ["reconstructed message (ID:" + str(msg_id) + ") from "]
log_msg.extend([str(msg_total), " fragments. "])
# cannot access msg.data if message is a service_response or else!
# log_msg += "[message length: " + str(len(str(json.loads(reconstructed_msg)["msg"]["data"]))) +"]"
log_msg.extend(["[duration: ", str(duration.total_seconds()), " s]"])
log_msg.extend(["[duration: ", str(duration), " s]"])
log_msg = "".join(log_msg)
self.protocol.log("info", log_msg)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import fnmatch
from functools import partial
from threading import Thread
from typing import Any

from action_msgs.msg import GoalStatus
from rosbridge_library.capability import Capability
Expand All @@ -52,7 +53,7 @@ class SendActionGoal(Capability):
cancel_action_goal_msg_fields = [(True, "action", str)]

actions_glob = None
client_handler_list = {}
client_handler_list: dict[str, ActionClientHandler] = {}

def __init__(self, protocol: Protocol) -> None:
# Call superclass constructor
Expand Down Expand Up @@ -182,7 +183,7 @@ def _failure(self, cid: str, action: str, exc: Exception) -> None:
outgoing_message["id"] = cid
self.protocol.send(outgoing_message)

def _feedback(self, cid: str, action: str, message: dict) -> None:
def _feedback(self, cid: str, action: str, message: Any) -> None:
outgoing_message = {
"op": "action_feedback",
"action": action,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
from rosbridge_library.internal.subscription_modifiers import MessageHandler

try:
from ujson import dumps as encode_json
from ujson import dumps as encode_json # type: ignore[import-untyped]
except ImportError:
try:
from simplejson import dumps as encode_json
from simplejson import dumps as encode_json # type: ignore[import-untyped]
except ImportError:
from json import dumps as encode_json

Expand Down
16 changes: 8 additions & 8 deletions rosbridge_library/src/rosbridge_library/internal/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

import time
from threading import Thread
from typing import Any, Callable, Optional, Union
from typing import Any, Callable, Optional

from rclpy.action import ActionClient
from rclpy.expand_topic_name import expand_topic_name
Expand All @@ -59,9 +59,9 @@ def __init__(
action: str,
action_type: str,
args: dict,
success_callback: Callable[[str, str, int, bool, dict], None],
error_callback: Callable[[str, str, Exception], None],
feedback_callback: Callable[[str, str, dict], None],
success_callback: Callable[[dict], None],
error_callback: Callable[[Exception], None],
feedback_callback: Optional[Callable[[dict], None]],
node_handle: Node,
) -> None:
"""
Expand Down Expand Up @@ -90,12 +90,11 @@ def __init__(
self.error = error_callback
self.feedback = feedback_callback
self.node_handle = node_handle
self.send_goal_helper = None
self.send_goal_helper = SendGoal()

def run(self) -> None:
try:
# Call the service and pass the result to the success handler
self.send_goal_helper = SendGoal()
self.success(
self.send_goal_helper.send_goal(
self.node_handle,
Expand All @@ -110,7 +109,7 @@ def run(self) -> None:
self.error(e)


def args_to_action_goal_instance(action: str, inst: Any, args: Union[list, dict]) -> Any:
def args_to_action_goal_instance(action: str, inst: Any, args: list | dict | None) -> Any:
""" "
Populate an action goal instance with the provided args
Expand Down Expand Up @@ -142,6 +141,7 @@ def get_result_cb(self, future: Future) -> None:

def goal_response_cb(self, future: Future) -> None:
self.goal_handle = future.result()
assert self.goal_handle is not None
if not self.goal_handle.accepted:
raise Exception("Action goal was rejected")
result_future = self.goal_handle.get_result_async()
Expand All @@ -156,7 +156,7 @@ def send_goal(
action: str,
action_type: str,
args: Optional[dict] = None,
feedback_cb: Optional[Callable[[str, str, dict], None]] = None,
feedback_cb: Optional[Callable[[dict], None]] = None,
) -> dict:
# Given the action name and type, fetch a request instance
action_name = expand_topic_name(action, node_handle.get_name(), node_handle.get_namespace())
Expand Down
10 changes: 5 additions & 5 deletions rosbridge_library/src/rosbridge_library/internal/ros_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
"""

# Variable containing the loaded classes
_loaded_msgs = {}
_loaded_srvs = {}
_loaded_actions = {}
_loaded_msgs: dict[str, Any] = {}
_loaded_srvs: dict[str, Any] = {}
_loaded_actions: dict[str, Any] = {}
_msgs_lock = Lock()
_srvs_lock = Lock()
_actions_lock = Lock()
Expand Down Expand Up @@ -185,7 +185,7 @@ def _get_class(typestring: str, subname: str, cache: Dict[str, Any], lock: Lock)
return cls


def _load_class(modname: str, subname: str, classname: str) -> None:
def _load_class(modname: str, subname: str, classname: str) -> Any:
"""Loads the manifest and imports the module that contains the specified
type.
Expand Down Expand Up @@ -220,7 +220,7 @@ def _splittype(typestring: str) -> Tuple[str, str]:
raise InvalidTypeStringException(typestring)


def _add_to_cache(cache: Dict[str, Any], lock: Lock, key: str, value: any) -> None:
def _add_to_cache(cache: Dict[str, Any], lock: Lock, key: str, value: Any) -> None:
lock.acquire()
cache[key] = value
lock.release()
Expand Down
16 changes: 8 additions & 8 deletions rosbridge_library/src/rosbridge_library/internal/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ def __init__(
self,
service: str,
args: dict,
success_callback: Callable[[str, str, int, bool, Any], None],
error_callback: Callable[[str, str, Exception], None],
success_callback: Callable[[dict], None],
error_callback: Callable[[Exception], None],
node_handle: Node,
) -> None:
"""Create a service caller for the specified service. Use start()
Expand Down Expand Up @@ -94,7 +94,7 @@ def run(self) -> None:
self.error(e)


def args_to_service_request_instance(service: str, inst: Any, args: dict) -> Any:
def args_to_service_request_instance(service: str, inst: Any, args: list | dict | None) -> Any:
"""Populate a service request instance with the provided args
args can be a dictionary of values, or a list, or None
Expand Down Expand Up @@ -122,14 +122,14 @@ def call_service(
service = expand_topic_name(service, node_handle.get_name(), node_handle.get_namespace())

service_names_and_types = dict(node_handle.get_service_names_and_types())
service_type = service_names_and_types.get(service)
if service_type is None:
service_types = service_names_and_types.get(service)
if service_types is None:
raise InvalidServiceException(service)

# service_type is a tuple of types at this point; only one type is supported.
if len(service_type) > 1:
node_handle.get_logger().warning(f"More than one service type detected: {service_type}")
service_type = service_type[0]
if len(service_types) > 1:
node_handle.get_logger().warning(f"More than one service type detected: {service_types}")
service_type = service_types[0]

service_class = get_service_class(service_type)
inst = get_service_request_instance(service_type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
# POSSIBILITY OF SUCH DAMAGE.

import sys
import time
import traceback
from collections import deque
from threading import Condition, Thread
from time import time

""" Sits between incoming messages from a subscription, and the outgoing
publish method. Provides throttling / buffering capabilities.
Expand Down Expand Up @@ -66,10 +66,10 @@ def set_queue_length(self, queue_length):
return self.transition()

def time_remaining(self):
return max((self.last_publish + self.throttle_rate) - time(), 0)
return max((self.last_publish + self.throttle_rate) - time.monotonic(), 0)

def handle_message(self, msg):
self.last_publish = time()
self.last_publish = time.monotonic()
self.publish(msg)

def transition(self):
Expand Down
5 changes: 3 additions & 2 deletions rosbridge_library/src/rosbridge_library/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
# POSSIBILITY OF SUCH DAMAGE.

import time
from typing import Any

from rosbridge_library.capabilities.fragmentation import Fragmentation
from rosbridge_library.util import bson, json
Expand Down Expand Up @@ -81,9 +82,9 @@ class Protocol:
# !! this might be related to (or even be avoided by using) throttle_rate !!
delay_between_messages = 0
# global list of non-ros advertised services
external_service_list = {}
external_service_list: dict[str, Any] = {}
# global list of non-ros advertised actions
external_action_list = {}
external_action_list: dict[str, Any] = {}
# Use only BSON for the whole communication if the server has been started with bson_only_mode:=True
bson_only_mode = False

Expand Down
4 changes: 2 additions & 2 deletions rosbridge_library/src/rosbridge_library/util/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# try to import json-lib: 1st try ujson, 2nd try simplejson, else import standard Python json
try:
import ujson as json
import ujson as json # type: ignore[import-untyped]
except ImportError:
try:
import simplejson as json
import simplejson as json # type: ignore[import-untyped]
except ImportError:
import json # noqa: F401

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ def request_service():
try:
incoming = sock.recv(max_msg_length) # receive service_response from rosbridge
if buffer == "":
buffer = incoming
buffer = incoming.decode("utf-8")
if incoming == "":
print("closing socket")
sock.close()
break
else:
buffer = buffer + incoming
buffer = buffer + incoming.decode("utf-8")
# print "buffer-length:", len(buffer)
try: # try to access service_request directly (not fragmented)
data_object = json.loads(buffer)
Expand All @@ -90,14 +90,14 @@ def request_service():
"}{"
) # split buffer into fragments and re-fill curly brackets
result = []
for fragment in result_string:
if fragment[0] != "{":
fragment = "{" + fragment
if fragment[len(fragment) - 1] != "}":
fragment = fragment + "}"
for fragment_str in result_string:
if fragment_str[0] != "{":
fragment_str = "{" + fragment_str
if fragment_str[len(fragment_str) - 1] != "}":
fragment_str = fragment_str + "}"
try:
result.append(
json.loads(fragment)
json.loads(fragment_str)
) # try to parse json from string, and append if successful
except Exception:
# print(e)
Expand Down
Loading

0 comments on commit 5a153cc

Please sign in to comment.