From b63b1c9d99de714e53acf20c137c9f3d204f7984 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20Chirico=20Indreb=C3=B8?= Date: Thu, 27 Feb 2025 15:17:02 +0100 Subject: [PATCH] Add type hinting to event queues --- .../models/communication/queues/events.py | 35 +++++++++++-------- .../communication/queues/queue_utils.py | 20 +++++++---- .../communication/queues/status_queue.py | 12 ++++--- src/isar/robot/robot.py | 4 +-- src/isar/robot/robot_stop_mission.py | 25 +++++++++---- src/isar/state_machine/states/idle.py | 10 ++++-- src/isar/state_machine/states/monitor.py | 4 +-- src/isar/state_machine/states/offline.py | 4 ++- src/isar/state_machine/states/stop.py | 9 ++--- src/isar/state_machine/transitions/stop.py | 4 +-- 10 files changed, 80 insertions(+), 47 deletions(-) diff --git a/src/isar/models/communication/queues/events.py b/src/isar/models/communication/queues/events.py index 53cfda79..3f6cf8ef 100644 --- a/src/isar/models/communication/queues/events.py +++ b/src/isar/models/communication/queues/events.py @@ -1,8 +1,14 @@ from queue import Queue +from transitions import State + from isar.config.settings import settings from isar.models.communication.queues.queue_io import QueueIO from isar.models.communication.queues.status_queue import StatusQueue +from robot_interface.models.exceptions.robot_exceptions import ErrorMessage +from robot_interface.models.mission.mission import Mission +from robot_interface.models.mission.status import RobotStatus, TaskStatus +from robot_interface.models.mission.task import TASKS class Events: @@ -27,26 +33,25 @@ def __init__(self) -> None: class StateMachineEvents: def __init__(self) -> None: - self.start_mission: Queue = Queue(maxsize=1) - self.stop_mission: Queue = Queue(maxsize=1) - self.pause_mission: Queue = Queue(maxsize=1) - self.task_status_request: Queue = Queue(maxsize=1) - self.robot_status_request: Queue = Queue(maxsize=1) + self.start_mission: Queue[Mission] = Queue(maxsize=1) + self.stop_mission: Queue[bool] = Queue(maxsize=1) + self.pause_mission: Queue[bool] = Queue(maxsize=1) + self.task_status_request: Queue[str] = Queue(maxsize=1) class RobotServiceEvents: def __init__(self) -> None: - self.task_status_updated: Queue = Queue(maxsize=1) - self.task_status_failed: Queue = Queue(maxsize=1) - self.mission_started: Queue = Queue(maxsize=1) - self.mission_failed: Queue = Queue(maxsize=1) - self.robot_status_changed: Queue = Queue(maxsize=1) - self.mission_failed_to_stop: Queue = Queue(maxsize=1) - self.mission_successfully_stopped: Queue = Queue(maxsize=1) + self.task_status_updated: Queue[TaskStatus] = Queue(maxsize=1) + self.task_status_failed: Queue[ErrorMessage] = Queue(maxsize=1) + self.mission_started: Queue[bool] = Queue(maxsize=1) + self.mission_failed: Queue[ErrorMessage] = Queue(maxsize=1) + self.robot_status_changed: Queue[bool] = Queue(maxsize=1) + self.mission_failed_to_stop: Queue[ErrorMessage] = Queue(maxsize=1) + self.mission_successfully_stopped: Queue[bool] = Queue(maxsize=1) class SharedState: def __init__(self) -> None: - self.state: StatusQueue = StatusQueue() - self.robot_status: StatusQueue = StatusQueue() - self.state_machine_current_task: StatusQueue = StatusQueue() + self.state: StatusQueue[State] = StatusQueue() + self.robot_status: StatusQueue[RobotStatus] = StatusQueue() + self.state_machine_current_task: StatusQueue[TASKS] = StatusQueue() diff --git a/src/isar/models/communication/queues/queue_utils.py b/src/isar/models/communication/queues/queue_utils.py index 191f60e2..c1678678 100644 --- a/src/isar/models/communication/queues/queue_utils.py +++ b/src/isar/models/communication/queues/queue_utils.py @@ -1,25 +1,31 @@ from queue import Empty, Queue -from typing import Any +from typing import Optional, TypeVar from isar.models.communication.queues.status_queue import StatusQueue +T = TypeVar("T") -def trigger_event(queue: Queue, data: Any = None) -> None: - queue.put(data if data is not None else True) +def trigger_event_without_data(queue: Queue[bool]) -> None: + queue.put(True) -def check_shared_state(queue: StatusQueue) -> Any: + +def trigger_event(queue: Queue[T], data: T) -> None: + queue.put(data) + + +def check_shared_state(queue: StatusQueue[T]) -> Optional[T]: try: return queue.check() except Empty: return None -def update_shared_state(queue: StatusQueue, data: Any = None) -> None: - queue.update(data if data is not None else True) +def update_shared_state(queue: StatusQueue[T], data: T) -> None: + queue.update(data) -def check_for_event(queue: Queue) -> Any: +def check_for_event(queue: Queue[T]) -> Optional[T]: try: return queue.get(block=False) except Empty: diff --git a/src/isar/models/communication/queues/status_queue.py b/src/isar/models/communication/queues/status_queue.py index 7589ff29..d7a5dd6d 100644 --- a/src/isar/models/communication/queues/status_queue.py +++ b/src/isar/models/communication/queues/status_queue.py @@ -1,20 +1,22 @@ from collections import deque from queue import Empty, Queue -from typing import Any +from typing import TypeVar +T = TypeVar("T") -class StatusQueue(Queue): + +class StatusQueue(Queue[T]): def __init__(self) -> None: super().__init__() - def check(self) -> Any: + def check(self) -> T: if not self._qsize(): raise Empty with self.mutex: queueList = list(self.queue) return queueList.pop() - def update(self, item: Any): + def update(self, item: T): with self.mutex: - self.queue = deque() + self.queue: deque[T] = deque() self.queue.append(item) diff --git a/src/isar/robot/robot.py b/src/isar/robot/robot.py index df82c14d..8d255713 100644 --- a/src/isar/robot/robot.py +++ b/src/isar/robot/robot.py @@ -72,8 +72,8 @@ def _check_and_handle_start_mission(self, event: Queue) -> None: ) self.start_mission_thread.start() - def _check_and_handle_task_status_request(self, event: Queue) -> None: - task_id = check_for_event(event) + def _check_and_handle_task_status_request(self, event: Queue[str]) -> None: + task_id: str = check_for_event(event) if task_id: self.robot_task_status_thread = RobotTaskStatusThread( self.events, self.robot, self.signal_thread_quitting, task_id diff --git a/src/isar/robot/robot_stop_mission.py b/src/isar/robot/robot_stop_mission.py index 34da1c90..f6bbb3d2 100644 --- a/src/isar/robot/robot_stop_mission.py +++ b/src/isar/robot/robot_stop_mission.py @@ -5,8 +5,12 @@ from isar.config.settings import settings from isar.models.communication.queues.events import Events -from isar.models.communication.queues.queue_utils import trigger_event +from isar.models.communication.queues.queue_utils import ( + trigger_event, + trigger_event_without_data, +) from robot_interface.models.exceptions.robot_exceptions import ( + ErrorMessage, RobotActionException, RobotException, ) @@ -29,7 +33,7 @@ def __init__( def run(self) -> None: retries = 0 - error_description: Optional[str] = None + error: Optional[ErrorMessage] = None while retries < settings.STOP_ROBOT_ATTEMPTS_LIMIT: if self.signal_thread_quitting.wait(0): return @@ -42,20 +46,29 @@ def run(self) -> None: f"\nAttempting to stop the robot again" ) retries += 1 - error_description = e.error_description + error = ErrorMessage( + error_reason=e.error_reason, error_description=e.error_description + ) time.sleep(settings.FSM_SLEEP_TIME) continue - trigger_event(self.events.robot_service_events.mission_successfully_stopped) + trigger_event_without_data( + self.events.robot_service_events.mission_successfully_stopped + ) return - error_message = ( + error_description = ( f"\nFailed to stop the robot after {retries} attempts because: " - f"{error_description}" + f"{error.error_description}" f"\nBe aware that the robot may still be moving even though a stop has " "been attempted" ) + error_message = ErrorMessage( + error_reason=error.error_reason, + error_description=error_description, + ) + trigger_event( self.events.robot_service_events.mission_failed_to_stop, error_message ) diff --git a/src/isar/state_machine/states/idle.py b/src/isar/state_machine/states/idle.py index e7576e54..1d97b6a9 100644 --- a/src/isar/state_machine/states/idle.py +++ b/src/isar/state_machine/states/idle.py @@ -39,7 +39,9 @@ def _check_and_handle_stop_mission_event(self, event: Queue) -> bool: return True return False - def _check_and_handle_start_mission_event(self, event: Queue) -> bool: + def _check_and_handle_start_mission_event( + self, event: Queue[StartMissionMessage] + ) -> bool: start_mission: Optional[StartMissionMessage] = check_for_event(event) if start_mission: self.state_machine.start_mission(mission=start_mission.mission) @@ -47,8 +49,10 @@ def _check_and_handle_start_mission_event(self, event: Queue) -> bool: return True return False - def _check_and_handle_robot_status_event(self, event: StatusQueue) -> bool: - robot_status = check_shared_state(event) + def _check_and_handle_robot_status_event( + self, event: StatusQueue[RobotStatus] + ) -> bool: + robot_status: RobotStatus = check_shared_state(event) if robot_status == RobotStatus.Offline: self.state_machine.robot_turned_offline() # type: ignore return True diff --git a/src/isar/state_machine/states/monitor.py b/src/isar/state_machine/states/monitor.py index 553b763c..6530cc36 100644 --- a/src/isar/state_machine/states/monitor.py +++ b/src/isar/state_machine/states/monitor.py @@ -90,7 +90,7 @@ def _check_and_handle_task_status_failed_event(self, event: Queue) -> bool: elif not self.awaiting_task_status: trigger_event( self.events.state_machine_events.task_status_request, - self.state_machine.current_task, + self.state_machine.current_task.id, ) self.awaiting_task_status = True return False @@ -103,7 +103,7 @@ def _check_and_handle_task_status_event(self, event: Queue) -> bool: elif not self.awaiting_task_status: trigger_event( self.events.state_machine_events.task_status_request, - self.state_machine.current_task, + self.state_machine.current_task.id, ) self.awaiting_task_status = True return False diff --git a/src/isar/state_machine/states/offline.py b/src/isar/state_machine/states/offline.py index ebac3026..33a66716 100644 --- a/src/isar/state_machine/states/offline.py +++ b/src/isar/state_machine/states/offline.py @@ -27,7 +27,9 @@ def stop(self) -> None: def _run(self) -> None: while True: - robot_status = check_shared_state(self.shared_state.robot_status) + robot_status: RobotStatus = check_shared_state( + self.shared_state.robot_status + ) if robot_status == RobotStatus.BlockedProtectiveStop: transition = self.state_machine.robot_protective_stop_engaged # type: ignore break diff --git a/src/isar/state_machine/states/stop.py b/src/isar/state_machine/states/stop.py index b1bb8880..c2b0f60e 100644 --- a/src/isar/state_machine/states/stop.py +++ b/src/isar/state_machine/states/stop.py @@ -7,6 +7,7 @@ from isar.models.communication.queues.queue_utils import check_for_event from isar.services.utilities.threaded_request import ThreadedRequest +from robot_interface.models.exceptions.robot_exceptions import ErrorMessage if TYPE_CHECKING: from isar.state_machine.state_machine import StateMachine @@ -31,15 +32,15 @@ def stop(self) -> None: self.stop_thread = None self._count_number_retries = 0 - def _check_and_handle_failed_stop(self, event: Queue) -> bool: - error_message = check_for_event(event) + def _check_and_handle_failed_stop(self, event: Queue[ErrorMessage]) -> bool: + error_message: Optional[ErrorMessage] = check_for_event(event) if error_message is not None: - self.logger.warning(error_message) + self.logger.warning(error_message.error_description) self.state_machine.mission_stopped() # type: ignore return True return False - def _check_and_handle_successful_stop(self, event: Queue) -> bool: + def _check_and_handle_successful_stop(self, event: Queue[bool]) -> bool: if check_for_event(event): self.state_machine.mission_stopped() # type: ignore return True diff --git a/src/isar/state_machine/transitions/stop.py b/src/isar/state_machine/transitions/stop.py index 4ee06fae..7e9b0361 100644 --- a/src/isar/state_machine/transitions/stop.py +++ b/src/isar/state_machine/transitions/stop.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING -from isar.models.communication.queues.queue_utils import trigger_event +from isar.models.communication.queues.queue_utils import trigger_event_without_data if TYPE_CHECKING: from isar.state_machine.state_machine import StateMachine @@ -10,7 +10,7 @@ def trigger_stop_mission_event(state_machine: "StateMachine") -> bool: - trigger_event(state_machine.events.state_machine_events.stop_mission) + trigger_event_without_data(state_machine.events.state_machine_events.stop_mission) return True