Skip to content

Commit

Permalink
Add type hinting to event queues
Browse files Browse the repository at this point in the history
  • Loading branch information
andchiind committed Feb 28, 2025
1 parent 67e11b1 commit b63b1c9
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 47 deletions.
35 changes: 20 additions & 15 deletions src/isar/models/communication/queues/events.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
from queue import Queue

from transitions import State

from isar.config.settings import settings
from isar.models.communication.queues.queue_io import QueueIO
from isar.models.communication.queues.status_queue import StatusQueue
from robot_interface.models.exceptions.robot_exceptions import ErrorMessage
from robot_interface.models.mission.mission import Mission
from robot_interface.models.mission.status import RobotStatus, TaskStatus
from robot_interface.models.mission.task import TASKS


class Events:
Expand All @@ -27,26 +33,25 @@ def __init__(self) -> None:

class StateMachineEvents:
def __init__(self) -> None:
self.start_mission: Queue = Queue(maxsize=1)
self.stop_mission: Queue = Queue(maxsize=1)
self.pause_mission: Queue = Queue(maxsize=1)
self.task_status_request: Queue = Queue(maxsize=1)
self.robot_status_request: Queue = Queue(maxsize=1)
self.start_mission: Queue[Mission] = Queue(maxsize=1)
self.stop_mission: Queue[bool] = Queue(maxsize=1)
self.pause_mission: Queue[bool] = Queue(maxsize=1)
self.task_status_request: Queue[str] = Queue(maxsize=1)


class RobotServiceEvents:
def __init__(self) -> None:
self.task_status_updated: Queue = Queue(maxsize=1)
self.task_status_failed: Queue = Queue(maxsize=1)
self.mission_started: Queue = Queue(maxsize=1)
self.mission_failed: Queue = Queue(maxsize=1)
self.robot_status_changed: Queue = Queue(maxsize=1)
self.mission_failed_to_stop: Queue = Queue(maxsize=1)
self.mission_successfully_stopped: Queue = Queue(maxsize=1)
self.task_status_updated: Queue[TaskStatus] = Queue(maxsize=1)
self.task_status_failed: Queue[ErrorMessage] = Queue(maxsize=1)
self.mission_started: Queue[bool] = Queue(maxsize=1)
self.mission_failed: Queue[ErrorMessage] = Queue(maxsize=1)
self.robot_status_changed: Queue[bool] = Queue(maxsize=1)
self.mission_failed_to_stop: Queue[ErrorMessage] = Queue(maxsize=1)
self.mission_successfully_stopped: Queue[bool] = Queue(maxsize=1)


class SharedState:
def __init__(self) -> None:
self.state: StatusQueue = StatusQueue()
self.robot_status: StatusQueue = StatusQueue()
self.state_machine_current_task: StatusQueue = StatusQueue()
self.state: StatusQueue[State] = StatusQueue()
self.robot_status: StatusQueue[RobotStatus] = StatusQueue()
self.state_machine_current_task: StatusQueue[TASKS] = StatusQueue()
20 changes: 13 additions & 7 deletions src/isar/models/communication/queues/queue_utils.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
from queue import Empty, Queue
from typing import Any
from typing import Optional, TypeVar

from isar.models.communication.queues.status_queue import StatusQueue

T = TypeVar("T")

def trigger_event(queue: Queue, data: Any = None) -> None:
queue.put(data if data is not None else True)

def trigger_event_without_data(queue: Queue[bool]) -> None:
queue.put(True)

def check_shared_state(queue: StatusQueue) -> Any:

def trigger_event(queue: Queue[T], data: T) -> None:
queue.put(data)


def check_shared_state(queue: StatusQueue[T]) -> Optional[T]:
try:
return queue.check()
except Empty:
return None


def update_shared_state(queue: StatusQueue, data: Any = None) -> None:
queue.update(data if data is not None else True)
def update_shared_state(queue: StatusQueue[T], data: T) -> None:
queue.update(data)


def check_for_event(queue: Queue) -> Any:
def check_for_event(queue: Queue[T]) -> Optional[T]:
try:
return queue.get(block=False)
except Empty:
Expand Down
12 changes: 7 additions & 5 deletions src/isar/models/communication/queues/status_queue.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
from collections import deque
from queue import Empty, Queue
from typing import Any
from typing import TypeVar

T = TypeVar("T")

class StatusQueue(Queue):

class StatusQueue(Queue[T]):
def __init__(self) -> None:
super().__init__()

def check(self) -> Any:
def check(self) -> T:
if not self._qsize():
raise Empty
with self.mutex:
queueList = list(self.queue)
return queueList.pop()

def update(self, item: Any):
def update(self, item: T):
with self.mutex:
self.queue = deque()
self.queue: deque[T] = deque()
self.queue.append(item)
4 changes: 2 additions & 2 deletions src/isar/robot/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ def _check_and_handle_start_mission(self, event: Queue) -> None:
)
self.start_mission_thread.start()

def _check_and_handle_task_status_request(self, event: Queue) -> None:
task_id = check_for_event(event)
def _check_and_handle_task_status_request(self, event: Queue[str]) -> None:
task_id: str = check_for_event(event)
if task_id:
self.robot_task_status_thread = RobotTaskStatusThread(
self.events, self.robot, self.signal_thread_quitting, task_id
Expand Down
25 changes: 19 additions & 6 deletions src/isar/robot/robot_stop_mission.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@

from isar.config.settings import settings
from isar.models.communication.queues.events import Events
from isar.models.communication.queues.queue_utils import trigger_event
from isar.models.communication.queues.queue_utils import (
trigger_event,
trigger_event_without_data,
)
from robot_interface.models.exceptions.robot_exceptions import (
ErrorMessage,
RobotActionException,
RobotException,
)
Expand All @@ -29,7 +33,7 @@ def __init__(

def run(self) -> None:
retries = 0
error_description: Optional[str] = None
error: Optional[ErrorMessage] = None
while retries < settings.STOP_ROBOT_ATTEMPTS_LIMIT:
if self.signal_thread_quitting.wait(0):
return
Expand All @@ -42,20 +46,29 @@ def run(self) -> None:
f"\nAttempting to stop the robot again"
)
retries += 1
error_description = e.error_description
error = ErrorMessage(
error_reason=e.error_reason, error_description=e.error_description
)
time.sleep(settings.FSM_SLEEP_TIME)
continue

trigger_event(self.events.robot_service_events.mission_successfully_stopped)
trigger_event_without_data(
self.events.robot_service_events.mission_successfully_stopped
)
return

error_message = (
error_description = (
f"\nFailed to stop the robot after {retries} attempts because: "
f"{error_description}"
f"{error.error_description}"
f"\nBe aware that the robot may still be moving even though a stop has "
"been attempted"
)

error_message = ErrorMessage(
error_reason=error.error_reason,
error_description=error_description,
)

trigger_event(
self.events.robot_service_events.mission_failed_to_stop, error_message
)
10 changes: 7 additions & 3 deletions src/isar/state_machine/states/idle.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,20 @@ def _check_and_handle_stop_mission_event(self, event: Queue) -> bool:
return True
return False

def _check_and_handle_start_mission_event(self, event: Queue) -> bool:
def _check_and_handle_start_mission_event(
self, event: Queue[StartMissionMessage]
) -> bool:
start_mission: Optional[StartMissionMessage] = check_for_event(event)
if start_mission:
self.state_machine.start_mission(mission=start_mission.mission)
self.state_machine.request_mission_start() # type: ignore
return True
return False

def _check_and_handle_robot_status_event(self, event: StatusQueue) -> bool:
robot_status = check_shared_state(event)
def _check_and_handle_robot_status_event(
self, event: StatusQueue[RobotStatus]
) -> bool:
robot_status: RobotStatus = check_shared_state(event)
if robot_status == RobotStatus.Offline:
self.state_machine.robot_turned_offline() # type: ignore
return True
Expand Down
4 changes: 2 additions & 2 deletions src/isar/state_machine/states/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _check_and_handle_task_status_failed_event(self, event: Queue) -> bool:
elif not self.awaiting_task_status:
trigger_event(
self.events.state_machine_events.task_status_request,
self.state_machine.current_task,
self.state_machine.current_task.id,
)
self.awaiting_task_status = True
return False
Expand All @@ -103,7 +103,7 @@ def _check_and_handle_task_status_event(self, event: Queue) -> bool:
elif not self.awaiting_task_status:
trigger_event(
self.events.state_machine_events.task_status_request,
self.state_machine.current_task,
self.state_machine.current_task.id,
)
self.awaiting_task_status = True
return False
Expand Down
4 changes: 3 additions & 1 deletion src/isar/state_machine/states/offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def stop(self) -> None:

def _run(self) -> None:
while True:
robot_status = check_shared_state(self.shared_state.robot_status)
robot_status: RobotStatus = check_shared_state(
self.shared_state.robot_status
)
if robot_status == RobotStatus.BlockedProtectiveStop:
transition = self.state_machine.robot_protective_stop_engaged # type: ignore
break
Expand Down
9 changes: 5 additions & 4 deletions src/isar/state_machine/states/stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from isar.models.communication.queues.queue_utils import check_for_event
from isar.services.utilities.threaded_request import ThreadedRequest
from robot_interface.models.exceptions.robot_exceptions import ErrorMessage

if TYPE_CHECKING:
from isar.state_machine.state_machine import StateMachine
Expand All @@ -31,15 +32,15 @@ def stop(self) -> None:
self.stop_thread = None
self._count_number_retries = 0

def _check_and_handle_failed_stop(self, event: Queue) -> bool:
error_message = check_for_event(event)
def _check_and_handle_failed_stop(self, event: Queue[ErrorMessage]) -> bool:
error_message: Optional[ErrorMessage] = check_for_event(event)
if error_message is not None:
self.logger.warning(error_message)
self.logger.warning(error_message.error_description)
self.state_machine.mission_stopped() # type: ignore
return True
return False

def _check_and_handle_successful_stop(self, event: Queue) -> bool:
def _check_and_handle_successful_stop(self, event: Queue[bool]) -> bool:
if check_for_event(event):
self.state_machine.mission_stopped() # type: ignore
return True
Expand Down
4 changes: 2 additions & 2 deletions src/isar/state_machine/transitions/stop.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING

from isar.models.communication.queues.queue_utils import trigger_event
from isar.models.communication.queues.queue_utils import trigger_event_without_data

if TYPE_CHECKING:
from isar.state_machine.state_machine import StateMachine
Expand All @@ -10,7 +10,7 @@


def trigger_stop_mission_event(state_machine: "StateMachine") -> bool:
trigger_event(state_machine.events.state_machine_events.stop_mission)
trigger_event_without_data(state_machine.events.state_machine_events.stop_mission)
return True


Expand Down

0 comments on commit b63b1c9

Please sign in to comment.