Application Examples
This page provides complete, working examples of pydoover Docker applications demonstrating various features and patterns.
Basic Application
A minimal application that demonstrates the core structure:
from pydoover.docker import Application, run_app
from pydoover.config import Schema, String, Integer, Boolean
class BasicConfig(Schema):
def __init__(self):
self.device_name = String("Device Name", default="sensor-001")
self.poll_interval = Integer("Poll Interval (seconds)", default=5, min_val=1)
self.debug_mode = Boolean("Debug Mode", default=False)
class BasicApp(Application):
config: BasicConfig
def setup(self):
"""Called once when the application starts."""
print(f"Starting {self.config.device_name.value}")
self.set_tag("app_status", "running")
self.loop_target_period = self.config.poll_interval.value
def main_loop(self):
"""Called repeatedly at loop_target_period interval."""
if self.config.debug_mode.value:
print("Debug: Main loop running")
# Your application logic here
self.set_tag("heartbeat", True)
if __name__ == "__main__":
run_app(BasicApp(config=BasicConfig()))
Sensor Monitoring Application
An application that monitors digital and analog inputs:
from pydoover.docker import Application, run_app
from pydoover.config import Schema, Integer, Number, Boolean
class SensorConfig(Schema):
def __init__(self):
self.num_di = Integer("Digital Input Count", default=4, min_val=0, max_val=8)
self.num_ai = Integer("Analog Input Count", default=2, min_val=0, max_val=4)
self.voltage_threshold = Number("Voltage Threshold", default=3.3)
self.enable_alerts = Boolean("Enable Alerts", default=True)
class SensorMonitor(Application):
config: SensorConfig
def setup(self):
self.set_tag("monitoring_active", True)
self.alert_states = {}
def main_loop(self):
# Monitor digital inputs
for i in range(self.config.num_di.value):
state = self.get_di(i)
self.set_tag(f"di_{i}", state)
if state:
print(f"Digital Input {i}: ACTIVE")
# Monitor analog inputs
for i in range(self.config.num_ai.value):
voltage = self.get_ai(i)
self.set_tag(f"ai_{i}", voltage)
# Check threshold
if voltage and voltage > self.config.voltage_threshold.value:
self.handle_threshold_exceeded(i, voltage)
def handle_threshold_exceeded(self, channel, voltage):
if not self.config.enable_alerts.value:
return
alert_key = f"alert_ai_{channel}"
if not self.alert_states.get(alert_key):
print(f"ALERT: Analog {channel} exceeded threshold: {voltage}V")
self.set_tag(alert_key, True)
self.alert_states[alert_key] = True
if __name__ == "__main__":
run_app(SensorMonitor(config=SensorConfig()))
Relay Control Application
An application that controls digital outputs based on inputs:
from pydoover.docker import Application, run_app
from pydoover.config import Schema, Integer, Boolean
class RelayConfig(Schema):
def __init__(self):
self.input_pin = Integer("Input Pin", default=0)
self.output_pin = Integer("Output Pin", default=0)
self.invert_logic = Boolean("Invert Logic", default=False)
self.auto_off_seconds = Integer("Auto-off Delay (0=disabled)", default=0)
class RelayController(Application):
config: RelayConfig
def setup(self):
# Ensure output is off at startup
self.set_do(self.config.output_pin.value, False)
self.set_tag("relay_state", False)
self.pending_off = False
def main_loop(self):
input_state = self.get_di(self.config.input_pin.value)
# Apply invert logic if configured
if self.config.invert_logic.value:
desired_output = not input_state
else:
desired_output = input_state
# Get current output state
current_output = self.get_do(self.config.output_pin.value)
# Only change if different
if desired_output != current_output:
self.set_do(self.config.output_pin.value, desired_output)
self.set_tag("relay_state", desired_output)
print(f"Relay {'ON' if desired_output else 'OFF'}")
# Schedule auto-off if configured
if desired_output and self.config.auto_off_seconds.value > 0:
self.schedule_do(
self.config.output_pin.value,
False,
self.config.auto_off_seconds.value
)
print(f"Auto-off scheduled in {self.config.auto_off_seconds.value}s")
if __name__ == "__main__":
run_app(RelayController(config=RelayConfig()))
Flow Meter Application
An application using pulse counting for flow measurement:
from pydoover.docker import Application, run_app
from pydoover.config import Schema, Integer, Number
class FlowConfig(Schema):
def __init__(self):
self.pulse_input = Integer("Pulse Input Pin", default=0)
self.pulses_per_liter = Number("Pulses per Liter", default=450.0)
self.rate_window = Integer("Rate Window (seconds)", default=60)
class FlowMeter(Application):
config: FlowConfig
def setup(self):
# Create pulse counter
self.counter = self.platform_iface.get_new_pulse_counter(
di=self.config.pulse_input.value,
edge="rising",
callback=self.on_pulse,
rate_window_secs=self.config.rate_window.value
)
self.total_liters = 0.0
self.set_tag("flow_total_liters", 0.0)
self.set_tag("flow_rate_lpm", 0.0)
async def on_pulse(self, pin, value, dt_secs, count, edge):
# Calculate volume for this pulse
liters_per_pulse = 1.0 / self.config.pulses_per_liter.value
self.total_liters += liters_per_pulse
# Update tags
self.set_tag("flow_total_liters", round(self.total_liters, 3))
self.set_tag("flow_pulse_count", count)
def main_loop(self):
# Calculate flow rate (liters per minute)
pulses_per_minute = self.counter.get_pulses_per_minute()
liters_per_minute = pulses_per_minute / self.config.pulses_per_liter.value
self.set_tag("flow_rate_lpm", round(liters_per_minute, 2))
print(f"Flow: {liters_per_minute:.2f} L/min, Total: {self.total_liters:.2f} L")
if __name__ == "__main__":
run_app(FlowMeter(config=FlowConfig()))
Modbus Sensor Reader
An application that reads from Modbus sensors:
from pydoover.docker import Application, run_app, ModbusConfig
from pydoover.config import Schema, Integer
class ModbusSensorConfig(Schema):
def __init__(self):
self.modbus_config = ModbusConfig("Modbus Settings")
self.modbus_config.type.default = "serial"
self.modbus_config.name.default = "sensor_bus"
self.modbus_config.serial_port.default = "/dev/ttyAMA0"
self.modbus_config.serial_baud.default = 9600
self.sensor_id = Integer("Sensor Modbus ID", default=1, min_val=1, max_val=247)
self.poll_seconds = Integer("Poll Interval", default=10)
class ModbusSensorReader(Application):
config: ModbusSensorConfig
def setup(self):
# Subscribe to sensor readings
self.modbus_iface.add_read_register_subscription(
bus_id="sensor_bus",
modbus_id=self.config.sensor_id.value,
start_address=0,
num_registers=4,
register_type=3, # Input registers
poll_secs=self.config.poll_seconds.value,
callback=self.on_sensor_data
)
self.set_tag("sensor_status", "initializing")
def on_sensor_data(self, values):
if values is None:
self.set_tag("sensor_status", "offline")
print("Failed to read sensor")
return
# Parse sensor values (example: temperature/humidity sensor)
temperature = values[0] / 10.0
humidity = values[1] / 10.0
pressure = values[2]
battery = values[3] / 100.0
# Update tags
self.set_tag("temperature", temperature)
self.set_tag("humidity", humidity)
self.set_tag("pressure", pressure)
self.set_tag("battery_voltage", battery)
self.set_tag("sensor_status", "online")
print(f"Temp: {temperature}C, RH: {humidity}%, Press: {pressure}hPa")
def main_loop(self):
# Main loop can handle other tasks
# Modbus readings happen via subscription
pass
if __name__ == "__main__":
run_app(ModbusSensorReader(config=ModbusSensorConfig()))
Cloud-Connected Application
An application with cloud channel subscriptions:
from pydoover.docker import Application, run_app
from pydoover.config import Schema, Boolean
import time
class CloudConfig(Schema):
def __init__(self):
self.accept_commands = Boolean("Accept Remote Commands", default=True)
class CloudConnectedApp(Application):
config: CloudConfig
def setup(self):
# Subscribe to command channel
self.device_agent.add_subscription("commands", self.on_command)
# Track last command time
self.last_command_time = None
self.set_tag("status", "ready")
async def on_command(self, channel_name, data):
if not self.config.accept_commands.value:
print("Commands disabled")
return
command = data.get("command")
payload = data.get("payload", {})
print(f"Received command: {command}")
self.last_command_time = time.time()
if command == "set_output":
pin = payload.get("pin", 0)
value = payload.get("value", False)
self.set_do(pin, value)
self.set_tag("last_command", f"set_output:{pin}={value}")
elif command == "get_status":
# Publish status to response channel
status = {
"online": self.device_agent.get_is_dda_online(),
"uptime": time.time(),
"di_states": [self.get_di(i) for i in range(4)]
}
self.device_agent.publish_to_channel("status_response", status)
elif command == "restart":
self.set_tag("status", "restarting")
# Handle restart logic
else:
print(f"Unknown command: {command}")
def main_loop(self):
# Publish periodic telemetry
telemetry = {
"timestamp": time.time(),
"di": [self.get_di(i) for i in range(4)],
"ai": [self.get_ai(i) for i in range(2)],
"cloud_connected": self.device_agent.get_is_dda_online()
}
self.device_agent.publish_to_channel("telemetry", telemetry)
# Update connection status tag
if self.device_agent.get_is_dda_online():
self.set_tag("connection", "online")
else:
self.set_tag("connection", "offline")
if __name__ == "__main__":
run_app(CloudConnectedApp(config=CloudConfig()))
Multi-Device Modbus Application
An application managing multiple Modbus devices:
from pydoover.docker import Application, run_app, ManyModbusConfig, ModbusConfig
from pydoover.config import Schema
class MultiModbusConfig(Schema):
def __init__(self):
self.modbus_config = ManyModbusConfig("Modbus Buses")
class MultiDeviceApp(Application):
config: MultiModbusConfig
def setup(self):
# Device 1: Temperature sensor on bus "sensors"
self.modbus_iface.add_read_register_subscription(
bus_id="sensors",
modbus_id=1,
start_address=0,
num_registers=2,
register_type=3,
poll_secs=10,
callback=self.on_temp_sensor
)
# Device 2: Power meter on bus "sensors"
self.modbus_iface.add_read_register_subscription(
bus_id="sensors",
modbus_id=2,
start_address=0,
num_registers=4,
register_type=4,
poll_secs=5,
callback=self.on_power_meter
)
# Device 3: PLC on bus "plc"
self.modbus_iface.add_read_register_subscription(
bus_id="plc",
modbus_id=1,
start_address=100,
num_registers=10,
register_type=4,
poll_secs=1,
callback=self.on_plc_data
)
def on_temp_sensor(self, values):
if values is None:
return
temp = values[0] / 10.0
humidity = values[1] / 10.0
self.set_tag("ambient_temp", temp)
self.set_tag("ambient_humidity", humidity)
def on_power_meter(self, values):
if values is None:
return
self.set_tag("voltage", values[0] / 10.0)
self.set_tag("current", values[1] / 100.0)
self.set_tag("power", values[2])
self.set_tag("energy", values[3])
def on_plc_data(self, values):
if values is None:
return
# Process PLC status registers
for i, val in enumerate(values):
self.set_tag(f"plc_reg_{100+i}", val)
def main_loop(self):
# Write setpoint to PLC if changed
new_setpoint = self.get_tag("new_setpoint")
if new_setpoint is not None:
self.modbus_iface.write_registers(
bus_id="plc",
modbus_id=1,
start_address=200,
values=[int(new_setpoint)],
register_type=4
)
self.set_tag("new_setpoint", None)
self.set_tag("current_setpoint", new_setpoint)
if __name__ == "__main__":
run_app(MultiDeviceApp(config=MultiModbusConfig()))
Async Application
An application using async patterns:
import asyncio
from pydoover.docker import Application, run_app
from pydoover.config import Schema
class AsyncApp(Application):
async def setup(self):
"""Async setup allows awaiting initialization tasks."""
await self.initialize_sensors()
self.set_tag("initialized", True)
async def initialize_sensors(self):
"""Simulated async sensor initialization."""
print("Initializing sensors...")
await asyncio.sleep(1)
print("Sensors ready")
async def main_loop(self):
"""Async main loop for concurrent operations."""
# Run multiple reads concurrently
results = await asyncio.gather(
self.read_sensor_async(0),
self.read_sensor_async(1),
self.read_sensor_async(2),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Sensor {i} error: {result}")
else:
await self.set_tag_async(f"sensor_{i}", result)
async def read_sensor_async(self, sensor_id):
"""Simulated async sensor read."""
value = await self.platform_iface.get_ai_async(sensor_id)
return value
if __name__ == "__main__":
run_app(AsyncApp(config=Schema()))
Application with Shutdown Handling
An application with proper shutdown handling:
from datetime import datetime
from pydoover.docker import Application, run_app
from pydoover.config import Schema
class SafeShutdownApp(Application):
def setup(self):
self.critical_operation_running = False
self.set_tag("status", "running")
async def on_shutdown_at(self, dt: datetime):
"""Called when shutdown is scheduled."""
print(f"Shutdown scheduled for {dt}")
self.set_tag("status", "shutdown_pending")
# Finish any critical operations
if self.critical_operation_running:
print("Waiting for critical operation to complete...")
async def check_can_shutdown(self) -> bool:
"""Check if safe to shutdown."""
# Don't shutdown if critical operation is running
if self.critical_operation_running:
print("Cannot shutdown: critical operation in progress")
return False
# Ensure outputs are in safe state
do_states = [self.get_do(i) for i in range(4)]
if any(do_states):
print("Cannot shutdown: outputs still active")
# Turn off all outputs
for i in range(4):
self.set_do(i, False)
return False
print("Safe to shutdown")
return True
def main_loop(self):
# Normal operation
pass
def start_critical_operation(self):
self.critical_operation_running = True
self.set_tag("critical_op", True)
def end_critical_operation(self):
self.critical_operation_running = False
self.set_tag("critical_op", False)
if __name__ == "__main__":
run_app(SafeShutdownApp(config=Schema()))
Test Mode Application
Using test mode for controlled testing:
import asyncio
from pydoover.docker import Application, run_app
from pydoover.config import Schema
class TestableApp(Application):
def setup(self):
self.counter = 0
self.set_tag("counter", 0)
def main_loop(self):
self.counter += 1
self.set_tag("counter", self.counter)
# Test code
async def test_application():
app = TestableApp(config=Schema(), test_mode=True)
# Start app in background
runner = run_app(app, start=False)
task = asyncio.create_task(runner)
# Wait for ready
await app.wait_until_ready()
assert app.is_ready
# Initial state
assert app.get_tag("counter") == 0
# Run one loop iteration
await app.next()
assert app.get_tag("counter") == 1
# Run more iterations
await app.next()
await app.next()
assert app.get_tag("counter") == 3
# Clean up
task.cancel()
print("All tests passed!")
if __name__ == "__main__":
asyncio.run(test_application())
State Machine Integration
For applications with complex operational states, the transitions library provides a powerful pattern for managing state flows. This example from a production motor control application demonstrates timeout-driven transitions and error handling.
from transitions import Machine
from pydoover.docker import Application, run_app
from pydoover.config import Schema, Integer
class MotorConfig(Schema):
def __init__(self):
self.start_timeout = Integer("Start Timeout (seconds)", default=30)
class MotorController(Application):
config: MotorConfig
def setup(self):
# Define states with timeouts
states = [
{"name": "ignition_off"},
{"name": "error", "timeout": 86400*2, "on_timeout": "reset_error"},
{"name": "starting_user", "timeout": 30, "on_timeout": "trigger_error"},
{"name": "running_user"},
{"name": "stopping_user"},
]
# Define allowed transitions
transitions = [
{"trigger": "user_run_start", "source": "ignition_off", "dest": "starting_user"},
{"trigger": "motor_running", "source": "starting_user", "dest": "running_user"},
{"trigger": "user_run_stop", "source": "running_user", "dest": "stopping_user"},
{"trigger": "motor_stopped", "source": "stopping_user", "dest": "ignition_off"},
{"trigger": "trigger_error", "source": "starting_user", "dest": "error"},
{"trigger": "reset_error", "source": "error", "dest": "ignition_off"},
]
# Initialize state machine
self.machine = Machine(
model=self,
states=states,
transitions=transitions,
initial="ignition_off",
queued=True # Queue transitions for rapid events
)
self.set_tag("state", self.state)
def main_loop(self):
# Check motor feedback
motor_feedback = self.get_di(0) # Motor running sensor
if self.state == "starting_user" and motor_feedback:
self.motor_running() # Trigger transition
self.set_tag("state", self.state)
elif self.state == "stopping_user" and not motor_feedback:
self.motor_stopped()
self.set_tag("state", self.state)
if __name__ == "__main__":
run_app(MotorController(config=MotorConfig()))
Source: small-motor-control
The state machine pattern provides:
- Timeout-driven transitions: Automatically fail if motor doesn't start within 30 seconds
- Queued transitions: Handle rapid state changes without race conditions
- Clear state diagrams: States and transitions map directly to code
Advanced State Machine with Callbacks
For production systems requiring sophisticated state management, this example demonstrates entry/exit callbacks, after-transition callbacks, and complex state evaluation logic.
from transitions import Machine, State
from pydoover.ui import UIManager
from pydoover.docker import ModbusInterface
class PumpControllerState:
# Define states with entry/exit callbacks
states = [
State(name="initialising"),
State(name="not_powered", on_exit=["clear_error", "clear_shutdown_reason"]),
State(name="manual_power_not_running",
on_enter=["clear_error", "clear_shutdown_reason"]),
State(name="manual_power_running",
on_enter=["reset_command_to_running", "clear_shutdown_reason"]),
State(name="auto_startup",
on_enter=["save_current_state_enter_time", "clear_error"]),
State(name="auto_running", on_exit="reset_command_to_shutdown"),
State(name="auto_shutdown",
on_enter=["save_current_state_enter_time", "reset_command_to_shutdown"]),
State(name="stabilise_error",
on_enter=["save_current_state_enter_time", "reset_command_to_shutdown"]),
State(name="post_fault",
on_enter="save_current_state_enter_time",
on_exit=["clear_error", "clear_shutdown_reason"]),
]
# Define transitions with after-callbacks for alerting
transitions = [
{"trigger": "initialised", "source": "initialising", "dest": "not_powered"},
{"trigger": "init_auto_start", "source": "not_powered", "dest": "auto_startup"},
{"trigger": "finish_auto_start", "source": "auto_startup", "dest": "auto_running",
"after": "alert_finish_auto_start"},
{"trigger": "init_auto_shutdown", "source": "auto_running", "dest": "auto_shutdown"},
{"trigger": "finish_auto_shutdown", "source": "auto_shutdown", "dest": "not_powered",
"after": "alert_finish_auto_shutdown"},
{"trigger": "register_fault_auto", "source": "auto_running", "dest": "stabilise_error"},
{"trigger": "finish_fault_resolution", "source": "stabilise_error", "dest": "post_fault",
"after": "alert_finish_fault_resolution"},
]
def __init__(self, ui_manager: UIManager, modbus_iface: ModbusInterface):
self.ui_manager = ui_manager
self.modbus_iface = modbus_iface
self.current_state_enter_time = None
self.last_error = None
# Initialize state machine
self.s_machine = Machine(
model=self,
states=self.states,
transitions=self.transitions,
initial="not_powered",
)
def save_current_state_enter_time(self):
self.current_state_enter_time = time.time()
@property
def time_in_state(self):
return time.time() - (self.current_state_enter_time or time.time())
def evaluate_state(self, pump_running_command: bool):
"""Evaluate current state and trigger appropriate transitions."""
match self.state:
case "auto_startup":
if self.time_in_state > 240: # Startup timeout
self.last_error = "Startup Attempt Failed"
self.register_fault_auto()
elif self.k37_state.is_auto and self.k37_state.is_running:
self.finish_auto_start()
case "auto_running":
if not pump_running_command:
self.init_auto_shutdown()
elif self.has_an_error_occurred:
self.register_fault_auto()
def spin_state(self, pump_running_command: bool):
"""Keep spinning until state has stabilised."""
last_state = None
count = 0
while last_state != self.state:
count += 1
if count > 15: # Prevent infinite loops
break
last_state = self.state
self.evaluate_state(pump_running_command)
# Callback implementations
def reset_command_to_shutdown(self):
self.ui_manager.coerce_command("pumpState", "shutdown")
def alert_finish_auto_start(self):
self.ui_manager.publish_to_channel(
"significantEvent",
f"{self.name} has been started remotely"
)
Source: pump_station_controller
Key features of this advanced pattern:
- Entry/exit callbacks: Automatically execute cleanup or setup when entering/leaving states
- After-transition callbacks: Send alerts after successful transitions
- Time-based state evaluation: Track time in state for timeout handling
- State spin loop: Evaluate and re-evaluate until state stabilizes
Project Structure
A recommended project structure for pydoover applications:
my_app/
├── __init__.py
├── application.py # Main Application class
├── app_config.py # Configuration schema
├── handlers/ # Command and event handlers
│ ├── __init__.py
│ └── commands.py
├── sensors/ # Sensor-specific code
│ ├── __init__.py
│ └── temperature.py
└── utils/ # Utility functions
├── __init__.py
└── conversions.py
init.py:
from pydoover.docker import run_app
from .application import MyApp
from .app_config import MyConfig
def main():
run_app(MyApp(config=MyConfig()))
application.py:
from pydoover.docker import Application
from .app_config import MyConfig
class MyApp(Application):
config: MyConfig
def setup(self):
pass
def main_loop(self):
pass
app_config.py:
from pydoover.config import Schema, Integer, String, Boolean
class MyConfig(Schema):
def __init__(self):
self.setting1 = Integer("Setting 1", default=10)
self.setting2 = String("Setting 2", default="value")
self.enabled = Boolean("Enabled", default=True)