Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,16 @@ class MyMachine(StateChart):
```{eval-rst}
.. autofunction:: statemachine.io.create_machine_class_from_definition
```

## timeout

```{versionadded} 3.0.0
```

```{seealso}
{ref}`timeout` how-to guide.
```

```{eval-rst}
.. autofunction:: statemachine.contrib.timeout.timeout
```
2 changes: 1 addition & 1 deletion docs/how-to/coming_from_transitions.md
Original file line number Diff line number Diff line change
Expand Up @@ -960,4 +960,4 @@ See {ref}`validations` for the full list.
| Ordered transitions | Yes | Via explicit wiring |
| Tags on states | Yes | Via subclassing |
| {ref}`Machine nesting (children) <invoke>` | Yes | Yes (invoke) |
| Timeout transitions | Yes | {ref}`Yes <delayed-events>` |
| {ref}`Timeout transitions <timeout>` | Yes | Yes |
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ invoke
models
integrations
weighted_transitions
timeout
```

```{toctree}
Expand Down
25 changes: 25 additions & 0 deletions docs/releases/3.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,31 @@ through the existing condition system — no engine changes required.
See {ref}`weighted-transitions` for full documentation.


#### State timeouts

A new contrib module `statemachine.contrib.timeout` provides a `timeout()` invoke helper
for per-state watchdog timers. When a state is entered, a background timer starts; if the
state is not exited before the timer expires, an event is sent automatically. The timer is
cancelled on state exit, with no manual cleanup needed.

```py
>>> from statemachine import State, StateChart
>>> from statemachine.contrib.timeout import timeout

>>> class WaitingMachine(StateChart):
... waiting = State(initial=True, invoke=timeout(5, on="expired"))
... timed_out = State(final=True)
... expired = waiting.to(timed_out)

>>> sm = WaitingMachine()
>>> sm.waiting.is_active
True

```

See {ref}`timeout` for full documentation.


#### Create state machine from a dict definition

Dynamically create state machine classes using
Expand Down
92 changes: 92 additions & 0 deletions docs/timeout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
(timeout)=
# State timeouts

A common need is preventing a state machine from getting stuck — for example,
a "waiting for response" state that should time out after a few seconds. The
{func}`~statemachine.contrib.timeout.timeout` helper makes this easy by
leveraging the {ref}`invoke <invoke>` system: a background timer starts when
the state is entered and is automatically cancelled when the state is exited.

## Basic usage

When the timeout expires and no custom event is specified, the standard
`done.invoke.<state>` event fires — just like any other invoke completion:

```py
>>> from statemachine import State, StateChart
>>> from statemachine.contrib.timeout import timeout

>>> class WaitingMachine(StateChart):
... waiting = State(initial=True, invoke=timeout(5))
... done = State(final=True)
... done_invoke_waiting = waiting.to(done)

>>> sm = WaitingMachine()
>>> sm.waiting.is_active
True

```

In this example, if the machine stays in `waiting` for 5 seconds,
`done.invoke.waiting` fires and the machine transitions to `done`.
If any other event causes a transition out of `waiting` first,
the timer is cancelled automatically.


## Custom timeout event

Use the `on` parameter to send a specific event name instead of
`done.invoke.<state>`. This is useful when you want to distinguish
timeouts from normal completions:

```py
>>> from statemachine import State, StateChart
>>> from statemachine.contrib.timeout import timeout

>>> class RequestMachine(StateChart):
... requesting = State(initial=True, invoke=timeout(30, on="request_timeout"))
... timed_out = State(final=True)
... request_timeout = requesting.to(timed_out)

>>> sm = RequestMachine()
>>> sm.requesting.is_active
True

```

## Composing with other invoke handlers

Since `timeout()` returns a standard invoke handler, you can combine it with
other handlers in a list. The first handler to complete and trigger a transition
wins — the state exit cancels everything else:

```py
>>> from statemachine import State, StateChart
>>> from statemachine.contrib.timeout import timeout

>>> def fetch_data():
... return {"status": "ok"}

>>> class LoadingMachine(StateChart):
... loading = State(initial=True, invoke=[fetch_data, timeout(30, on="too_slow")])
... ready = State(final=True)
... stuck = State(final=True)
... done_invoke_loading = loading.to(ready)
... too_slow = loading.to(stuck)

>>> sm = LoadingMachine()
>>> sm.ready.is_active
True

```

In this example:
- If `fetch_data` completes within 30 seconds, `done.invoke.loading` fires
and transitions to `ready`, cancelling the timeout.
- If 30 seconds pass first, `too_slow` fires and transitions to `stuck`,
cancelling the `fetch_data` invoke.


## API reference

See {func}`~statemachine.contrib.timeout.timeout` in the {ref}`API docs <api>`.
68 changes: 68 additions & 0 deletions statemachine/contrib/timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Timeout helper for state invocations.

Provides a ``timeout()`` function that returns an :class:`~statemachine.invoke.IInvoke`
handler. When a state is entered, the handler waits for the given duration; if the state
is not exited before the timer expires, an event is sent to the machine.

Example::

from statemachine.contrib.timeout import timeout

class MyMachine(StateChart):
waiting = State(initial=True, invoke=timeout(5, on="expired"))
timed_out = State(final=True)
expired = waiting.to(timed_out)
"""

from typing import TYPE_CHECKING
from typing import Any

if TYPE_CHECKING:
from statemachine.invoke import InvokeContext


class _Timeout:
"""IInvoke handler that waits for a duration and optionally sends an event."""

def __init__(self, duration: float, on: "str | None" = None):
self.duration = duration
self.on = on

def run(self, ctx: "InvokeContext") -> Any:
"""Wait for the timeout duration, then optionally send an event.

If the owning state is exited before the timer expires (``ctx.cancelled``
is set), the handler returns immediately without sending anything.
"""
fired = not ctx.cancelled.wait(timeout=self.duration)
if not fired:
# State was exited before the timeout — nothing to do.
return None
if self.on is not None:
ctx.send(self.on)
return None

def __repr__(self) -> str:
args = f"{self.duration}"
if self.on is not None:
args += f", on={self.on!r}"
return f"timeout({args})"


def timeout(duration: float, *, on: "str | None" = None) -> _Timeout:
"""Create a timeout invoke handler.

Args:
duration: Time in seconds to wait before firing.
on: Event name to send when the timeout expires. If ``None``, the
standard ``done.invoke.<state>`` event fires via invoke completion.

Returns:
An :class:`~statemachine.invoke.IInvoke`-compatible handler.

Raises:
ValueError: If *duration* is not positive.
"""
if duration <= 0:
raise ValueError(f"timeout duration must be positive, got {duration}")
return _Timeout(duration=duration, on=on)
139 changes: 139 additions & 0 deletions tests/test_contrib_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Tests for the timeout contrib module."""

import threading

import pytest
from statemachine.contrib.timeout import _Timeout
from statemachine.contrib.timeout import timeout

from statemachine import State
from statemachine import StateChart


class TestTimeoutValidation:
def test_positive_duration(self):
t = timeout(5)
assert isinstance(t, _Timeout)
assert t.duration == 5

def test_zero_duration_raises(self):
with pytest.raises(ValueError, match="must be positive"):
timeout(0)

def test_negative_duration_raises(self):
with pytest.raises(ValueError, match="must be positive"):
timeout(-1)

def test_repr_without_on(self):
assert repr(timeout(5)) == "timeout(5)"

def test_repr_with_on(self):
assert repr(timeout(3.5, on="expired")) == "timeout(3.5, on='expired')"


class TestTimeoutBasic:
"""Timeout fires done.invoke.<state> when no custom event is specified."""

async def test_timeout_fires_done_invoke(self, sm_runner):
class SM(StateChart):
waiting = State(initial=True, invoke=timeout(0.05))
done = State(final=True)
done_invoke_waiting = waiting.to(done)

sm = await sm_runner.start(SM)
await sm_runner.sleep(0.15)
await sm_runner.processing_loop(sm)

assert "done" in sm.configuration_values

async def test_timeout_cancelled_on_early_exit(self, sm_runner):
"""If the machine transitions out before the timeout, nothing fires."""

class SM(StateChart):
waiting = State(initial=True, invoke=timeout(10))
other = State(final=True)
go = waiting.to(other)
# No done_invoke_waiting — would fail if timeout fired unexpectedly
done_invoke_waiting = waiting.to(waiting)

sm = await sm_runner.start(SM)
await sm_runner.send(sm, "go")

assert "other" in sm.configuration_values


class TestTimeoutCustomEvent:
"""Timeout fires a custom event via the `on` parameter."""

async def test_custom_event_fires(self, sm_runner):
class SM(StateChart):
waiting = State(initial=True, invoke=timeout(0.05, on="expired"))
timed_out = State(final=True)
expired = waiting.to(timed_out)

sm = await sm_runner.start(SM)
await sm_runner.sleep(0.15)
await sm_runner.processing_loop(sm)

assert "timed_out" in sm.configuration_values

async def test_custom_event_cancelled_on_early_exit(self, sm_runner):
class SM(StateChart):
waiting = State(initial=True, invoke=timeout(10, on="expired"))
other = State(final=True)
go = waiting.to(other)
expired = waiting.to(waiting)

sm = await sm_runner.start(SM)
await sm_runner.send(sm, "go")

assert "other" in sm.configuration_values


class TestTimeoutComposition:
"""Timeout combined with other invoke handlers — first to complete wins."""

async def test_invoke_completes_before_timeout(self, sm_runner):
"""A fast invoke handler transitions out, cancelling the timeout."""

def fast_handler():
return "fast_result"

class SM(StateChart):
loading = State(initial=True, invoke=[fast_handler, timeout(10, on="too_slow")])
ready = State(final=True)
stuck = State(final=True)
done_invoke_loading = loading.to(ready)
too_slow = loading.to(stuck)

sm = await sm_runner.start(SM)
await sm_runner.sleep(0.15)
await sm_runner.processing_loop(sm)

assert "ready" in sm.configuration_values

async def test_timeout_fires_before_slow_invoke(self, sm_runner):
"""Timeout fires while a slow invoke handler is still running."""
handler_cancelled = threading.Event()

class SlowHandler:
def run(self, ctx):
# Wait until cancelled (state exit) — simulates long-running work
ctx.cancelled.wait()
handler_cancelled.set()

class SM(StateChart):
loading = State(initial=True, invoke=[SlowHandler(), timeout(0.05, on="too_slow")])
ready = State(final=True)
stuck = State(final=True)
done_invoke_loading = loading.to(ready)
too_slow = loading.to(stuck)

sm = await sm_runner.start(SM)
await sm_runner.sleep(0.15)
await sm_runner.processing_loop(sm)

assert "stuck" in sm.configuration_values
# The slow handler should have been cancelled when the state exited
handler_cancelled.wait(timeout=2)
assert handler_cancelled.is_set()
Loading