Skip to content

Application Examples

This page provides complete, working examples of pydoover Docker applications demonstrating various features and patterns.

Basic Application

A minimal application that demonstrates the core structure:

from pydoover.docker import Application, run_app
from pydoover.config import Schema, String, Integer, Boolean

class BasicConfig(Schema):
    def __init__(self):
        self.device_name = String("Device Name", default="sensor-001")
        self.poll_interval = Integer("Poll Interval (seconds)", default=5, min_val=1)
        self.debug_mode = Boolean("Debug Mode", default=False)

class BasicApp(Application):
    config: BasicConfig

    def setup(self):
        """Called once when the application starts."""
        print(f"Starting {self.config.device_name.value}")
        self.set_tag("app_status", "running")
        self.loop_target_period = self.config.poll_interval.value

    def main_loop(self):
        """Called repeatedly at loop_target_period interval."""
        if self.config.debug_mode.value:
            print("Debug: Main loop running")

        # Your application logic here
        self.set_tag("heartbeat", True)

if __name__ == "__main__":
    run_app(BasicApp(config=BasicConfig()))

Sensor Monitoring Application

An application that monitors digital and analog inputs:

from pydoover.docker import Application, run_app
from pydoover.config import Schema, Integer, Number, Boolean

class SensorConfig(Schema):
    def __init__(self):
        self.num_di = Integer("Digital Input Count", default=4, min_val=0, max_val=8)
        self.num_ai = Integer("Analog Input Count", default=2, min_val=0, max_val=4)
        self.voltage_threshold = Number("Voltage Threshold", default=3.3)
        self.enable_alerts = Boolean("Enable Alerts", default=True)

class SensorMonitor(Application):
    config: SensorConfig

    def setup(self):
        self.set_tag("monitoring_active", True)
        self.alert_states = {}

    def main_loop(self):
        # Monitor digital inputs
        for i in range(self.config.num_di.value):
            state = self.get_di(i)
            self.set_tag(f"di_{i}", state)

            if state:
                print(f"Digital Input {i}: ACTIVE")

        # Monitor analog inputs
        for i in range(self.config.num_ai.value):
            voltage = self.get_ai(i)
            self.set_tag(f"ai_{i}", voltage)

            # Check threshold
            if voltage and voltage > self.config.voltage_threshold.value:
                self.handle_threshold_exceeded(i, voltage)

    def handle_threshold_exceeded(self, channel, voltage):
        if not self.config.enable_alerts.value:
            return

        alert_key = f"alert_ai_{channel}"
        if not self.alert_states.get(alert_key):
            print(f"ALERT: Analog {channel} exceeded threshold: {voltage}V")
            self.set_tag(alert_key, True)
            self.alert_states[alert_key] = True

if __name__ == "__main__":
    run_app(SensorMonitor(config=SensorConfig()))

Relay Control Application

An application that controls digital outputs based on inputs:

from pydoover.docker import Application, run_app
from pydoover.config import Schema, Integer, Boolean

class RelayConfig(Schema):
    def __init__(self):
        self.input_pin = Integer("Input Pin", default=0)
        self.output_pin = Integer("Output Pin", default=0)
        self.invert_logic = Boolean("Invert Logic", default=False)
        self.auto_off_seconds = Integer("Auto-off Delay (0=disabled)", default=0)

class RelayController(Application):
    config: RelayConfig

    def setup(self):
        # Ensure output is off at startup
        self.set_do(self.config.output_pin.value, False)
        self.set_tag("relay_state", False)
        self.pending_off = False

    def main_loop(self):
        input_state = self.get_di(self.config.input_pin.value)

        # Apply invert logic if configured
        if self.config.invert_logic.value:
            desired_output = not input_state
        else:
            desired_output = input_state

        # Get current output state
        current_output = self.get_do(self.config.output_pin.value)

        # Only change if different
        if desired_output != current_output:
            self.set_do(self.config.output_pin.value, desired_output)
            self.set_tag("relay_state", desired_output)
            print(f"Relay {'ON' if desired_output else 'OFF'}")

            # Schedule auto-off if configured
            if desired_output and self.config.auto_off_seconds.value > 0:
                self.schedule_do(
                    self.config.output_pin.value,
                    False,
                    self.config.auto_off_seconds.value
                )
                print(f"Auto-off scheduled in {self.config.auto_off_seconds.value}s")

if __name__ == "__main__":
    run_app(RelayController(config=RelayConfig()))

Flow Meter Application

An application using pulse counting for flow measurement:

from pydoover.docker import Application, run_app
from pydoover.config import Schema, Integer, Number

class FlowConfig(Schema):
    def __init__(self):
        self.pulse_input = Integer("Pulse Input Pin", default=0)
        self.pulses_per_liter = Number("Pulses per Liter", default=450.0)
        self.rate_window = Integer("Rate Window (seconds)", default=60)

class FlowMeter(Application):
    config: FlowConfig

    def setup(self):
        # Create pulse counter
        self.counter = self.platform_iface.get_new_pulse_counter(
            di=self.config.pulse_input.value,
            edge="rising",
            callback=self.on_pulse,
            rate_window_secs=self.config.rate_window.value
        )

        self.total_liters = 0.0
        self.set_tag("flow_total_liters", 0.0)
        self.set_tag("flow_rate_lpm", 0.0)

    async def on_pulse(self, pin, value, dt_secs, count, edge):
        # Calculate volume for this pulse
        liters_per_pulse = 1.0 / self.config.pulses_per_liter.value
        self.total_liters += liters_per_pulse

        # Update tags
        self.set_tag("flow_total_liters", round(self.total_liters, 3))
        self.set_tag("flow_pulse_count", count)

    def main_loop(self):
        # Calculate flow rate (liters per minute)
        pulses_per_minute = self.counter.get_pulses_per_minute()
        liters_per_minute = pulses_per_minute / self.config.pulses_per_liter.value

        self.set_tag("flow_rate_lpm", round(liters_per_minute, 2))

        print(f"Flow: {liters_per_minute:.2f} L/min, Total: {self.total_liters:.2f} L")

if __name__ == "__main__":
    run_app(FlowMeter(config=FlowConfig()))

Modbus Sensor Reader

An application that reads from Modbus sensors:

from pydoover.docker import Application, run_app, ModbusConfig
from pydoover.config import Schema, Integer

class ModbusSensorConfig(Schema):
    def __init__(self):
        self.modbus_config = ModbusConfig("Modbus Settings")
        self.modbus_config.type.default = "serial"
        self.modbus_config.name.default = "sensor_bus"
        self.modbus_config.serial_port.default = "/dev/ttyAMA0"
        self.modbus_config.serial_baud.default = 9600

        self.sensor_id = Integer("Sensor Modbus ID", default=1, min_val=1, max_val=247)
        self.poll_seconds = Integer("Poll Interval", default=10)

class ModbusSensorReader(Application):
    config: ModbusSensorConfig

    def setup(self):
        # Subscribe to sensor readings
        self.modbus_iface.add_read_register_subscription(
            bus_id="sensor_bus",
            modbus_id=self.config.sensor_id.value,
            start_address=0,
            num_registers=4,
            register_type=3,  # Input registers
            poll_secs=self.config.poll_seconds.value,
            callback=self.on_sensor_data
        )

        self.set_tag("sensor_status", "initializing")

    def on_sensor_data(self, values):
        if values is None:
            self.set_tag("sensor_status", "offline")
            print("Failed to read sensor")
            return

        # Parse sensor values (example: temperature/humidity sensor)
        temperature = values[0] / 10.0
        humidity = values[1] / 10.0
        pressure = values[2]
        battery = values[3] / 100.0

        # Update tags
        self.set_tag("temperature", temperature)
        self.set_tag("humidity", humidity)
        self.set_tag("pressure", pressure)
        self.set_tag("battery_voltage", battery)
        self.set_tag("sensor_status", "online")

        print(f"Temp: {temperature}C, RH: {humidity}%, Press: {pressure}hPa")

    def main_loop(self):
        # Main loop can handle other tasks
        # Modbus readings happen via subscription
        pass

if __name__ == "__main__":
    run_app(ModbusSensorReader(config=ModbusSensorConfig()))

Cloud-Connected Application

An application with cloud channel subscriptions:

from pydoover.docker import Application, run_app
from pydoover.config import Schema, Boolean
import time

class CloudConfig(Schema):
    def __init__(self):
        self.accept_commands = Boolean("Accept Remote Commands", default=True)

class CloudConnectedApp(Application):
    config: CloudConfig

    def setup(self):
        # Subscribe to command channel
        self.device_agent.add_subscription("commands", self.on_command)

        # Track last command time
        self.last_command_time = None
        self.set_tag("status", "ready")

    async def on_command(self, channel_name, data):
        if not self.config.accept_commands.value:
            print("Commands disabled")
            return

        command = data.get("command")
        payload = data.get("payload", {})

        print(f"Received command: {command}")
        self.last_command_time = time.time()

        if command == "set_output":
            pin = payload.get("pin", 0)
            value = payload.get("value", False)
            self.set_do(pin, value)
            self.set_tag("last_command", f"set_output:{pin}={value}")

        elif command == "get_status":
            # Publish status to response channel
            status = {
                "online": self.device_agent.get_is_dda_online(),
                "uptime": time.time(),
                "di_states": [self.get_di(i) for i in range(4)]
            }
            self.device_agent.publish_to_channel("status_response", status)

        elif command == "restart":
            self.set_tag("status", "restarting")
            # Handle restart logic

        else:
            print(f"Unknown command: {command}")

    def main_loop(self):
        # Publish periodic telemetry
        telemetry = {
            "timestamp": time.time(),
            "di": [self.get_di(i) for i in range(4)],
            "ai": [self.get_ai(i) for i in range(2)],
            "cloud_connected": self.device_agent.get_is_dda_online()
        }

        self.device_agent.publish_to_channel("telemetry", telemetry)

        # Update connection status tag
        if self.device_agent.get_is_dda_online():
            self.set_tag("connection", "online")
        else:
            self.set_tag("connection", "offline")

if __name__ == "__main__":
    run_app(CloudConnectedApp(config=CloudConfig()))

Multi-Device Modbus Application

An application managing multiple Modbus devices:

from pydoover.docker import Application, run_app, ManyModbusConfig, ModbusConfig
from pydoover.config import Schema

class MultiModbusConfig(Schema):
    def __init__(self):
        self.modbus_config = ManyModbusConfig("Modbus Buses")

class MultiDeviceApp(Application):
    config: MultiModbusConfig

    def setup(self):
        # Device 1: Temperature sensor on bus "sensors"
        self.modbus_iface.add_read_register_subscription(
            bus_id="sensors",
            modbus_id=1,
            start_address=0,
            num_registers=2,
            register_type=3,
            poll_secs=10,
            callback=self.on_temp_sensor
        )

        # Device 2: Power meter on bus "sensors"
        self.modbus_iface.add_read_register_subscription(
            bus_id="sensors",
            modbus_id=2,
            start_address=0,
            num_registers=4,
            register_type=4,
            poll_secs=5,
            callback=self.on_power_meter
        )

        # Device 3: PLC on bus "plc"
        self.modbus_iface.add_read_register_subscription(
            bus_id="plc",
            modbus_id=1,
            start_address=100,
            num_registers=10,
            register_type=4,
            poll_secs=1,
            callback=self.on_plc_data
        )

    def on_temp_sensor(self, values):
        if values is None:
            return
        temp = values[0] / 10.0
        humidity = values[1] / 10.0
        self.set_tag("ambient_temp", temp)
        self.set_tag("ambient_humidity", humidity)

    def on_power_meter(self, values):
        if values is None:
            return
        self.set_tag("voltage", values[0] / 10.0)
        self.set_tag("current", values[1] / 100.0)
        self.set_tag("power", values[2])
        self.set_tag("energy", values[3])

    def on_plc_data(self, values):
        if values is None:
            return
        # Process PLC status registers
        for i, val in enumerate(values):
            self.set_tag(f"plc_reg_{100+i}", val)

    def main_loop(self):
        # Write setpoint to PLC if changed
        new_setpoint = self.get_tag("new_setpoint")
        if new_setpoint is not None:
            self.modbus_iface.write_registers(
                bus_id="plc",
                modbus_id=1,
                start_address=200,
                values=[int(new_setpoint)],
                register_type=4
            )
            self.set_tag("new_setpoint", None)
            self.set_tag("current_setpoint", new_setpoint)

if __name__ == "__main__":
    run_app(MultiDeviceApp(config=MultiModbusConfig()))

Async Application

An application using async patterns:

import asyncio
from pydoover.docker import Application, run_app
from pydoover.config import Schema

class AsyncApp(Application):
    async def setup(self):
        """Async setup allows awaiting initialization tasks."""
        await self.initialize_sensors()
        self.set_tag("initialized", True)

    async def initialize_sensors(self):
        """Simulated async sensor initialization."""
        print("Initializing sensors...")
        await asyncio.sleep(1)
        print("Sensors ready")

    async def main_loop(self):
        """Async main loop for concurrent operations."""
        # Run multiple reads concurrently
        results = await asyncio.gather(
            self.read_sensor_async(0),
            self.read_sensor_async(1),
            self.read_sensor_async(2),
            return_exceptions=True
        )

        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Sensor {i} error: {result}")
            else:
                await self.set_tag_async(f"sensor_{i}", result)

    async def read_sensor_async(self, sensor_id):
        """Simulated async sensor read."""
        value = await self.platform_iface.get_ai_async(sensor_id)
        return value

if __name__ == "__main__":
    run_app(AsyncApp(config=Schema()))

Application with Shutdown Handling

An application with proper shutdown handling:

from datetime import datetime
from pydoover.docker import Application, run_app
from pydoover.config import Schema

class SafeShutdownApp(Application):
    def setup(self):
        self.critical_operation_running = False
        self.set_tag("status", "running")

    async def on_shutdown_at(self, dt: datetime):
        """Called when shutdown is scheduled."""
        print(f"Shutdown scheduled for {dt}")
        self.set_tag("status", "shutdown_pending")

        # Finish any critical operations
        if self.critical_operation_running:
            print("Waiting for critical operation to complete...")

    async def check_can_shutdown(self) -> bool:
        """Check if safe to shutdown."""
        # Don't shutdown if critical operation is running
        if self.critical_operation_running:
            print("Cannot shutdown: critical operation in progress")
            return False

        # Ensure outputs are in safe state
        do_states = [self.get_do(i) for i in range(4)]
        if any(do_states):
            print("Cannot shutdown: outputs still active")
            # Turn off all outputs
            for i in range(4):
                self.set_do(i, False)
            return False

        print("Safe to shutdown")
        return True

    def main_loop(self):
        # Normal operation
        pass

    def start_critical_operation(self):
        self.critical_operation_running = True
        self.set_tag("critical_op", True)

    def end_critical_operation(self):
        self.critical_operation_running = False
        self.set_tag("critical_op", False)

if __name__ == "__main__":
    run_app(SafeShutdownApp(config=Schema()))

Test Mode Application

Using test mode for controlled testing:

import asyncio
from pydoover.docker import Application, run_app
from pydoover.config import Schema

class TestableApp(Application):
    def setup(self):
        self.counter = 0
        self.set_tag("counter", 0)

    def main_loop(self):
        self.counter += 1
        self.set_tag("counter", self.counter)

# Test code
async def test_application():
    app = TestableApp(config=Schema(), test_mode=True)

    # Start app in background
    runner = run_app(app, start=False)
    task = asyncio.create_task(runner)

    # Wait for ready
    await app.wait_until_ready()
    assert app.is_ready

    # Initial state
    assert app.get_tag("counter") == 0

    # Run one loop iteration
    await app.next()
    assert app.get_tag("counter") == 1

    # Run more iterations
    await app.next()
    await app.next()
    assert app.get_tag("counter") == 3

    # Clean up
    task.cancel()
    print("All tests passed!")

if __name__ == "__main__":
    asyncio.run(test_application())

State Machine Integration

For applications with complex operational states, the transitions library provides a powerful pattern for managing state flows. This example from a production motor control application demonstrates timeout-driven transitions and error handling.

from transitions import Machine
from pydoover.docker import Application, run_app
from pydoover.config import Schema, Integer

class MotorConfig(Schema):
    def __init__(self):
        self.start_timeout = Integer("Start Timeout (seconds)", default=30)

class MotorController(Application):
    config: MotorConfig

    def setup(self):
        # Define states with timeouts
        states = [
            {"name": "ignition_off"},
            {"name": "error", "timeout": 86400*2, "on_timeout": "reset_error"},
            {"name": "starting_user", "timeout": 30, "on_timeout": "trigger_error"},
            {"name": "running_user"},
            {"name": "stopping_user"},
        ]

        # Define allowed transitions
        transitions = [
            {"trigger": "user_run_start", "source": "ignition_off", "dest": "starting_user"},
            {"trigger": "motor_running", "source": "starting_user", "dest": "running_user"},
            {"trigger": "user_run_stop", "source": "running_user", "dest": "stopping_user"},
            {"trigger": "motor_stopped", "source": "stopping_user", "dest": "ignition_off"},
            {"trigger": "trigger_error", "source": "starting_user", "dest": "error"},
            {"trigger": "reset_error", "source": "error", "dest": "ignition_off"},
        ]

        # Initialize state machine
        self.machine = Machine(
            model=self,
            states=states,
            transitions=transitions,
            initial="ignition_off",
            queued=True  # Queue transitions for rapid events
        )

        self.set_tag("state", self.state)

    def main_loop(self):
        # Check motor feedback
        motor_feedback = self.get_di(0)  # Motor running sensor

        if self.state == "starting_user" and motor_feedback:
            self.motor_running()  # Trigger transition
            self.set_tag("state", self.state)

        elif self.state == "stopping_user" and not motor_feedback:
            self.motor_stopped()
            self.set_tag("state", self.state)

if __name__ == "__main__":
    run_app(MotorController(config=MotorConfig()))

Source: small-motor-control

The state machine pattern provides:

  • Timeout-driven transitions: Automatically fail if motor doesn't start within 30 seconds
  • Queued transitions: Handle rapid state changes without race conditions
  • Clear state diagrams: States and transitions map directly to code

Advanced State Machine with Callbacks

For production systems requiring sophisticated state management, this example demonstrates entry/exit callbacks, after-transition callbacks, and complex state evaluation logic.

from transitions import Machine, State
from pydoover.ui import UIManager
from pydoover.docker import ModbusInterface

class PumpControllerState:
    # Define states with entry/exit callbacks
    states = [
        State(name="initialising"),
        State(name="not_powered", on_exit=["clear_error", "clear_shutdown_reason"]),
        State(name="manual_power_not_running",
              on_enter=["clear_error", "clear_shutdown_reason"]),
        State(name="manual_power_running",
              on_enter=["reset_command_to_running", "clear_shutdown_reason"]),
        State(name="auto_startup",
              on_enter=["save_current_state_enter_time", "clear_error"]),
        State(name="auto_running", on_exit="reset_command_to_shutdown"),
        State(name="auto_shutdown",
              on_enter=["save_current_state_enter_time", "reset_command_to_shutdown"]),
        State(name="stabilise_error",
              on_enter=["save_current_state_enter_time", "reset_command_to_shutdown"]),
        State(name="post_fault",
              on_enter="save_current_state_enter_time",
              on_exit=["clear_error", "clear_shutdown_reason"]),
    ]

    # Define transitions with after-callbacks for alerting
    transitions = [
        {"trigger": "initialised", "source": "initialising", "dest": "not_powered"},
        {"trigger": "init_auto_start", "source": "not_powered", "dest": "auto_startup"},
        {"trigger": "finish_auto_start", "source": "auto_startup", "dest": "auto_running",
         "after": "alert_finish_auto_start"},
        {"trigger": "init_auto_shutdown", "source": "auto_running", "dest": "auto_shutdown"},
        {"trigger": "finish_auto_shutdown", "source": "auto_shutdown", "dest": "not_powered",
         "after": "alert_finish_auto_shutdown"},
        {"trigger": "register_fault_auto", "source": "auto_running", "dest": "stabilise_error"},
        {"trigger": "finish_fault_resolution", "source": "stabilise_error", "dest": "post_fault",
         "after": "alert_finish_fault_resolution"},
    ]

    def __init__(self, ui_manager: UIManager, modbus_iface: ModbusInterface):
        self.ui_manager = ui_manager
        self.modbus_iface = modbus_iface
        self.current_state_enter_time = None
        self.last_error = None

        # Initialize state machine
        self.s_machine = Machine(
            model=self,
            states=self.states,
            transitions=self.transitions,
            initial="not_powered",
        )

    def save_current_state_enter_time(self):
        self.current_state_enter_time = time.time()

    @property
    def time_in_state(self):
        return time.time() - (self.current_state_enter_time or time.time())

    def evaluate_state(self, pump_running_command: bool):
        """Evaluate current state and trigger appropriate transitions."""
        match self.state:
            case "auto_startup":
                if self.time_in_state > 240:  # Startup timeout
                    self.last_error = "Startup Attempt Failed"
                    self.register_fault_auto()
                elif self.k37_state.is_auto and self.k37_state.is_running:
                    self.finish_auto_start()

            case "auto_running":
                if not pump_running_command:
                    self.init_auto_shutdown()
                elif self.has_an_error_occurred:
                    self.register_fault_auto()

    def spin_state(self, pump_running_command: bool):
        """Keep spinning until state has stabilised."""
        last_state = None
        count = 0
        while last_state != self.state:
            count += 1
            if count > 15:  # Prevent infinite loops
                break
            last_state = self.state
            self.evaluate_state(pump_running_command)

    # Callback implementations
    def reset_command_to_shutdown(self):
        self.ui_manager.coerce_command("pumpState", "shutdown")

    def alert_finish_auto_start(self):
        self.ui_manager.publish_to_channel(
            "significantEvent",
            f"{self.name} has been started remotely"
        )

Source: pump_station_controller

Key features of this advanced pattern:

  • Entry/exit callbacks: Automatically execute cleanup or setup when entering/leaving states
  • After-transition callbacks: Send alerts after successful transitions
  • Time-based state evaluation: Track time in state for timeout handling
  • State spin loop: Evaluate and re-evaluate until state stabilizes

Project Structure

A recommended project structure for pydoover applications:

my_app/
├── __init__.py
├── application.py      # Main Application class
├── app_config.py       # Configuration schema
├── handlers/           # Command and event handlers
│   ├── __init__.py
│   └── commands.py
├── sensors/            # Sensor-specific code
│   ├── __init__.py
│   └── temperature.py
└── utils/              # Utility functions
    ├── __init__.py
    └── conversions.py

init.py:

from pydoover.docker import run_app
from .application import MyApp
from .app_config import MyConfig

def main():
    run_app(MyApp(config=MyConfig()))

application.py:

from pydoover.docker import Application
from .app_config import MyConfig

class MyApp(Application):
    config: MyConfig

    def setup(self):
        pass

    def main_loop(self):
        pass

app_config.py:

from pydoover.config import Schema, Integer, String, Boolean

class MyConfig(Schema):
    def __init__(self):
        self.setting1 = Integer("Setting 1", default=10)
        self.setting2 = String("Setting 2", default="value")
        self.enabled = Boolean("Enabled", default=True)