Skip to content

Commit

Permalink
Refactor idle as an event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
andchiind committed Feb 27, 2025
1 parent f6ed8b4 commit 3f08659
Showing 1 changed file with 34 additions and 24 deletions.
58 changes: 34 additions & 24 deletions src/isar/state_machine/states/idle.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import logging
import time
from typing import TYPE_CHECKING, Callable, Optional
from queue import Queue
from typing import TYPE_CHECKING, Optional

from transitions import State

from isar.config.settings import settings
from isar.models.communication.message import StartMissionMessage
from isar.models.communication.queues.queue_utils import (
check_for_event,
check_shared_state,
)
from isar.models.communication.queues.status_queue import StatusQueue
from robot_interface.models.mission.status import RobotStatus

if TYPE_CHECKING:
Expand All @@ -32,36 +33,45 @@ def start(self) -> None:
def stop(self) -> None:
return

def _is_ready_to_poll_for_status(self) -> bool:
time_since_last_robot_status_poll = (
time.time() - self.last_robot_status_poll_time
)
return (
time_since_last_robot_status_poll > settings.ROBOT_API_STATUS_POLL_INTERVAL
)
def _check_and_handle_stop_mission_event(self, event: Queue) -> bool:
if check_for_event(event):
self.state_machine.stop() # type: ignore
return True
return False

def _check_and_handle_start_mission_event(self, event: Queue) -> bool:
start_mission: Optional[StartMissionMessage] = check_for_event(event)
if start_mission:
self.state_machine.start_mission(mission=start_mission.mission)
self.state_machine.request_mission_start() # type: ignore
return True
return False

def _check_and_handle_robot_status_event(self, event: StatusQueue) -> bool:
robot_status = check_shared_state(event)
if robot_status == RobotStatus.Offline:
self.state_machine.robot_turned_offline() # type: ignore
return True
elif robot_status == RobotStatus.BlockedProtectiveStop:
self.state_machine.robot_protective_stop_engaged() # type: ignore
return True
return False

def _run(self) -> None:
transition: Callable
while True:
if check_for_event(self.events.API_events.stop_mission.input):
transition = self.state_machine.stop # type: ignore
if self._check_and_handle_stop_mission_event(
self.events.API_events.stop_mission.input
):
break

start_mission: Optional[StartMissionMessage] = check_for_event(
if self._check_and_handle_start_mission_event(
self.events.API_events.start_mission.input
)
if start_mission:
self.state_machine.start_mission(mission=start_mission.mission)
transition = self.state_machine.request_mission_start # type: ignore
):
break

robot_status = check_shared_state(self.shared_state.robot_status)
if robot_status == RobotStatus.Offline:
transition = self.state_machine.robot_turned_offline # type: ignore
break
elif robot_status == RobotStatus.BlockedProtectiveStop:
transition = self.state_machine.robot_protective_stop_engaged # type: ignore
if self._check_and_handle_robot_status_event(
self.shared_state.robot_status
):
break

time.sleep(self.state_machine.sleep_time)
transition()

0 comments on commit 3f08659

Please sign in to comment.