State Machine
The StateMachine class provides an async state machine with built-in timeout support. It wraps the transitions library's AsyncMachine with the AsyncTimeout state feature, giving you states that can auto-transition after a configurable duration. This is useful for modelling device behaviour such as startup sequences, operating modes, and error recovery.
The transitions library is an optional dependency. Install it separately before using the state machine:
pip install transitions
If transitions is not installed, importing StateMachine will print a warning and the class will be None.
Import
from pydoover.state import StateMachine
Defining States
States are defined as a list of dictionaries. Each state has a name and can optionally specify timeout, entry, exit, and timeout callbacks:
states = [
{
"name": "off",
"on_enter": "on_enter_off",
"on_exit": "on_exit_off",
},
{
"name": "starting",
"timeout": 30, # Auto-transition after 30 seconds
"on_timeout": "on_starting_timeout",
"on_enter": "on_enter_starting",
},
{
"name": "running",
"on_enter": "on_enter_running",
},
{
"name": "stopping",
"timeout": 15,
"on_timeout": "on_stopping_timeout",
},
{
"name": "error",
"timeout": 60,
"on_timeout": "on_error_timeout",
"on_enter": "on_enter_error",
},
]
| Key | Type | Description |
|---|---|---|
name | str | The state identifier. |
timeout | float | Seconds before the on_timeout callback fires automatically. Optional. |
on_timeout | str | Method or trigger name to call when the timeout expires. Passing a trigger name creates an auto-transition. |
on_enter | str | Method name to call when entering this state. |
on_exit | str | Method name to call when leaving this state. |
Callbacks are specified as strings referencing method names on the model object (typically self). Since trigger names also become methods on the model, you can pass a trigger name to on_timeout to create auto-transitions — for example, a "sleeping" state with "on_timeout": "awaken" automatically transitions to the awaken trigger's destination when the timeout expires.
Defining Transitions
Transitions define the valid state changes and the triggers that cause them:
transitions = [
{"trigger": "start", "source": "off", "dest": "starting"},
{"trigger": "started", "source": "starting", "dest": "running"},
{"trigger": "stop", "source": "running", "dest": "stopping"},
{"trigger": "stopped", "source": "stopping", "dest": "off"},
{"trigger": "fault", "source": "*", "dest": "error"},
{"trigger": "reset", "source": "error", "dest": "off"},
]
| Key | Type | Description |
|---|---|---|
trigger | str | The event name. Becomes an async method on the model. |
source | str | The state this transition can fire from. Use "*" for any state. |
dest | str | The target state. |
Creating the State Machine
Pass the states and transitions to the StateMachine constructor. The model parameter specifies the object that receives trigger methods and callbacks:
machine = StateMachine(
states=states,
transitions=transitions,
model=self, # Trigger methods and callbacks are added to this object
initial="off", # Starting state
queued=True, # Queue transitions for async safety
)
Setting queued=True is recommended for async applications. It ensures that transitions triggered during a callback are queued and processed sequentially rather than running concurrently, preventing race conditions.
After construction, each trigger name becomes an async method on the model:
await self.start() # Transitions from "off" to "starting" await self.started() # Transitions from "starting" to "running" await self.fault() # Transitions from any state to "error"
Callbacks
All callbacks are async methods on the model object. They receive no arguments beyond self:
async def on_enter_starting(self):
"""Called when entering the 'starting' state."""
await self.send_start_command()
async def on_starting_timeout(self):
"""Called if 'starting' state times out after 30 seconds."""
await self.fault() # Transition to error state
async def on_enter_error(self):
"""Called when entering the 'error' state."""
await self.send_notification("Device entered error state")
async def on_error_timeout(self):
"""Called after 60 seconds in 'error' state."""
await self.reset() # Attempt recovery by returning to 'off'
Checking State
The current state is available on the model object:
if self.state == "running":
# Device is operational
pass
Modifying State Properties at Runtime
Use get_state() on the state machine to access a state object and modify its properties. This is useful for adjusting timeouts based on runtime conditions like battery voltage or network quality.
sleeping_state = self.state_machine.get_state("sleeping")
sleeping_state.timeout = new_sleep_duration
Changes take effect the next time the state is entered.
Practical Example
This example models a pump controller with five states: off, starting, running, stopping, and error. The starting and stopping states have timeouts that transition to the error state if the pump fails to respond. The error state auto-recovers after 60 seconds.
from pydoover.state import StateMachine
class PumpController:
def __init__(self):
states = [
{"name": "off"},
{
"name": "starting",
"timeout": 30,
"on_timeout": "on_start_timeout",
"on_enter": "on_enter_starting",
},
{
"name": "running",
"on_enter": "on_enter_running",
},
{
"name": "stopping",
"timeout": 15,
"on_timeout": "on_stop_timeout",
"on_enter": "on_enter_stopping",
},
{
"name": "error",
"timeout": 60,
"on_timeout": "on_error_timeout",
"on_enter": "on_enter_error",
},
]
transitions = [
{"trigger": "begin_start", "source": "off", "dest": "starting"},
{"trigger": "confirm_running", "source": "starting", "dest": "running"},
{"trigger": "begin_stop", "source": "running", "dest": "stopping"},
{"trigger": "confirm_stopped", "source": "stopping", "dest": "off"},
{"trigger": "fault", "source": "*", "dest": "error"},
{"trigger": "recover", "source": "error", "dest": "off"},
]
self.machine = StateMachine(
states=states,
transitions=transitions,
model=self,
initial="off",
queued=True,
)
async def on_enter_starting(self):
"""Send the start command to the pump hardware."""
await self.platform.set_do(1, True)
async def on_start_timeout(self):
"""Pump did not confirm running within 30 seconds."""
await self.fault()
async def on_enter_running(self):
"""Pump is confirmed running."""
await self.update_aggregate({"pump_state": "running"})
async def on_enter_stopping(self):
"""Send the stop command to the pump hardware."""
await self.platform.set_do(1, False)
async def on_stop_timeout(self):
"""Pump did not confirm stopped within 15 seconds."""
await self.fault()
async def on_enter_error(self):
"""Pump entered error state. Notify operator."""
await self.platform.set_do(1, False) # Safety: ensure pump is off
await self.send_notification("Pump fault detected")
await self.update_aggregate({"pump_state": "error"})
async def on_error_timeout(self):
"""Attempt automatic recovery after 60 seconds."""
await self.recover()
async def main_loop(self):
# Check pump feedback and trigger transitions
pump_feedback = await self.platform.fetch_di(2)
if self.state == "starting" and pump_feedback:
await self.confirm_running()
elif self.state == "stopping" and not pump_feedback:
await self.confirm_stopped()
A water meter application uses StateMachine to manage a sleep/wake cycle for a battery-powered device. The sleeping state timeout is adjusted dynamically based on battery voltage — lower voltage means longer sleep to conserve power. on_timeout references trigger names directly, so the state machine auto-transitions without explicit callback methods:
from pydoover.state import StateMachine
DEFAULT_SLEEP_TIME = 60 * 15 # 15 minutes
DEFAULT_AWAKE_TIME = 60 * 2
DEFAULT_WAKE_DELAY = 15
def get_sleep_time(voltage: float) -> float:
if voltage is None or voltage > 12.9:
return DEFAULT_SLEEP_TIME
if 12.5 < voltage <= 12.9:
return DEFAULT_SLEEP_TIME * 1.5
if 12.2 < voltage <= 12.5:
return DEFAULT_SLEEP_TIME * 3
return DEFAULT_SLEEP_TIME * 5 # voltage <= 12.2
class MaceWaterMeterState:
states = [
{"name": "initial"},
{"name": "sleeping", "timeout": DEFAULT_SLEEP_TIME, "on_timeout": "awaken"},
{"name": "awake_init", "timeout": DEFAULT_WAKE_DELAY, "on_timeout": "initialised"},
{"name": "awake_rt", "timeout": DEFAULT_AWAKE_TIME, "on_timeout": "goto_sleep"},
]
transitions = [
{"trigger": "initialise", "source": "initial", "dest": "awake_init"},
{"trigger": "awaken", "source": "sleeping", "dest": "awake_init"},
{"trigger": "initialised", "source": "awake_init", "dest": "awake_rt"},
{"trigger": "goto_sleep", "source": "awake_init", "dest": "sleeping"},
{"trigger": "goto_sleep", "source": "awake_rt", "dest": "sleeping"},
]
def __init__(self):
self.state_machine = StateMachine(
states=self.states,
transitions=self.transitions,
model=self,
initial="initial",
queued=True,
)
self.should_request = False
async def spin(self, battery_voltage: float):
self.state_machine.get_state("sleeping").timeout = get_sleep_time(
battery_voltage
)
if self.state == "initial":
await self.initialise()
async def on_enter_sleeping(self):
self.should_request = False
async def on_enter_awake_init(self):
self.should_request = True
This pattern — on_timeout pointing to a trigger for auto-transitions, combined with get_state().timeout for adaptive timing — is common in battery-powered devices that need to balance data freshness against power consumption.
Source: mace-water-meter
Related Pages
- Utilities Overview -- all available utility modules
- Alarms -- threshold monitoring for triggering state transitions
- Async Helpers -- async utility functions used alongside state machines