Skip to content

Examples

This page provides three complete Docker application examples, progressing from simple to comprehensive. Each example is a self-contained application that can be deployed to a Doovit device.

Example 1: Simple I/O Control

This application reads digital inputs and controls digital outputs based on configurable rules. It demonstrates the fundamentals: configuration, the main loop, hardware I/O, and tag persistence.

What it does: Monitors a door sensor on digital input 0 and controls an alarm relay on digital output 0. When the door is open for longer than a configurable delay, the alarm activates. The operator can view the door state and alarm state in the web portal.

from pydoover.docker import Application, run_app
from pydoover import config
from pydoover.tags import Tags, Boolean, Number
from pydoover.ui import UI, BooleanVariable, NumericVariable, tag_ref
import time


class Config(config.Schema):
    alarm_delay = config.Number(
        "Alarm Delay", default=30, description="Seconds door must be open before alarm triggers"
    )
    alarm_enabled = config.Boolean(
        "Alarm Enabled", default=True, description="Enable the door alarm"
    )


class AppTags(Tags):
    door_open = Boolean(default=False)
    alarm_active = Boolean(default=False)
    door_open_duration = Number(default=0.0)


class AppUI(UI):
    door_status = BooleanVariable("Door", value=AppTags.door_open)
    alarm_status = BooleanVariable("Alarm", value=AppTags.alarm_active)
    open_duration = NumericVariable("Open Duration", units="s", precision=0)


class DoorAlarmApp(Application):
    config_cls = Config
    tags_cls = AppTags
    ui_cls = AppUI

    def setup(self):
        self.loop_target_period = 1
        self.door_opened_at = None

    def main_loop(self):
        # Read the door sensor (True = open)
        door_open = self.platform_iface.fetch_di(0)
        self.set_tag("door_open", door_open)

        # Track how long the door has been open
        if door_open:
            if self.door_opened_at is None:
                self.door_opened_at = time.time()
            duration = time.time() - self.door_opened_at
            self.set_tag("door_open_duration", duration)
        else:
            self.door_opened_at = None
            self.set_tag("door_open_duration", 0.0)
            duration = 0.0

        # Activate alarm if door has been open too long
        should_alarm = (
            self.config.alarm_enabled
            and door_open
            and duration > self.config.alarm_delay
        )
        self.set_tag("alarm_active", should_alarm)

        # Drive the relay
        self.platform_iface.set_do(0, should_alarm)

    def on_shutdown_at(self, dt):
        # Ensure relay is off on shutdown
        self.platform_iface.set_do(0, False)
        self.set_tag("alarm_active", False)


if __name__ == "__main__":
    run_app(DoorAlarmApp)

Key points:

  • Config defines two user-editable settings that appear in the web portal.
  • AppTags provides persistent state that survives application restarts.
  • AppUI declares variables that display live data in the portal.
  • The main_loop() runs every second, reading the sensor and applying the alarm logic.
  • On shutdown, the relay is set to a safe state (off).

Example 2: Modbus Sensor Application

This application reads data from a Modbus RTU device (an energy meter) and publishes readings to the Doover cloud. It demonstrates Modbus bus management, register reading, 32-bit float conversion, and polling subscriptions.

What it does: Connects to an energy meter over Modbus RTU, reads voltage, current, and power registers every 10 seconds, converts the raw register values to engineering units, and publishes the data as tags and channel messages.

from pydoover.docker import Application, run_app
from pydoover.docker.modbus import ModbusConfig, ModbusInterface
from pydoover import config
from pydoover.tags import Tags, Number
from pydoover.ui import UI, NumericVariable


class Config(config.Schema):
    modbus = ModbusConfig()
    device_address = config.Number("Device Address", default=1, description="Modbus device address (1-247)")
    poll_interval = config.Number("Poll Interval", default=10, description="Seconds between reads")


class MeterTags(Tags):
    voltage = Number(default=0.0)
    current = Number(default=0.0)
    power = Number(default=0.0)
    energy_total = Number(default=0.0)


class MeterUI(UI):
    voltage_display = NumericVariable("Voltage", units="V", precision=1)
    current_display = NumericVariable("Current", units="A", precision=2)
    power_display = NumericVariable("Power", units="W", precision=0)
    energy_display = NumericVariable("Total Energy", units="kWh", precision=1)


class EnergyMeterApp(Application):
    config_cls = Config
    tags_cls = MeterTags
    ui_cls = MeterUI

    def setup(self):
        self.loop_target_period = self.config.poll_interval

        # Open the Modbus serial bus
        self.bus_id = self.modbus_iface.open_bus(
            bus_type="serial",
            name="energy_meter",
            serial_port=self.config.modbus.serial_port,
            serial_baud=self.config.modbus.serial_baud,
        )

        self.device_addr = int(self.config.device_address)
        self.set_tag("status", "online", log=True)

    def read_float_register(self, start_address):
        """Read two consecutive input registers and convert to a 32-bit float."""
        regs = self.modbus_iface.read_registers(
            bus_id=self.bus_id,
            modbus_id=self.device_addr,
            start_address=start_address,
            num_registers=2,
            register_type=3,  # Input registers
        )
        return ModbusInterface.two_words_to_32bit_float(regs[0], regs[1])

    def main_loop(self):
        # Read measurements from the energy meter
        # Register map (device-specific):
        #   0-1: Voltage (V)
        #   2-3: Current (A)
        #   4-5: Active Power (W)
        #   6-7: Total Energy (kWh)
        voltage = self.read_float_register(0)
        current = self.read_float_register(2)
        power = self.read_float_register(4)
        energy = self.read_float_register(6)

        # Update tags
        self.set_tag("voltage", round(voltage, 1))
        self.set_tag("current", round(current, 2))
        self.set_tag("power", round(power, 0))
        self.set_tag("energy_total", round(energy, 1))

        # Publish telemetry message
        self.create_message("energy_telemetry", {
            "voltage": voltage,
            "current": current,
            "power": power,
            "energy_total": energy,
        })

    def on_shutdown_at(self, dt):
        self.modbus_iface.close_bus(self.bus_id)
        self.set_tag("status", "offline", log=True)


if __name__ == "__main__":
    run_app(EnergyMeterApp)

Key points:

  • ModbusConfig() in the config schema lets operators configure the serial port and baud rate through the web portal without code changes.
  • read_float_register() is a helper that encapsulates the two-step process of reading two 16-bit registers and converting them to a float.
  • The register map (addresses 0-7) is device-specific. Always refer to the connected device's documentation for correct register addresses and data types.
  • The bus is closed cleanly during shutdown.

Using Polling Subscriptions

As an alternative to reading registers in the main loop, you can use polling subscriptions. These are particularly useful when you want the Modbus Service to handle timing independently.

def setup(self):
    self.bus_id = self.modbus_iface.open_bus(
        bus_type="serial",
        name="energy_meter",
        serial_port=self.config.modbus.serial_port,
        serial_baud=self.config.modbus.serial_baud,
    )

    # Set up automatic polling every 10 seconds
    self.modbus_iface.add_read_register_subscription(
        bus_id=self.bus_id,
        modbus_id=int(self.config.device_address),
        start_address=0,
        num_registers=8,  # Read all 4 float values at once
        register_type=3,
        poll_secs=10,
        callback=self.on_meter_data,
    )

def on_meter_data(self, values):
    """Called automatically every 10 seconds with fresh register data."""
    voltage = ModbusInterface.two_words_to_32bit_float(values[0], values[1])
    current = ModbusInterface.two_words_to_32bit_float(values[2], values[3])
    power = ModbusInterface.two_words_to_32bit_float(values[4], values[5])
    energy = ModbusInterface.two_words_to_32bit_float(values[6], values[7])

    self.set_tag("voltage", round(voltage, 1))
    self.set_tag("current", round(current, 2))
    self.set_tag("power", round(power, 0))
    self.set_tag("energy_total", round(energy, 1))

The subscription reads all 8 registers in a single transaction and delivers them to the callback. This reduces the number of Modbus transactions and keeps the main loop free for other logic.

Example 3: Full-Featured Application

This application demonstrates a complete real-world pattern: configuration with multiple sections, tags with auto-logging, interactive UI elements, channel subscriptions for receiving cloud commands, and notifications.

What it does: Manages a water pump station. It monitors tank level via an analog sensor, controls a pump via a digital output, accepts remote commands (start/stop) from the web portal, and sends notifications for alarm conditions.

from pydoover.docker import Application, run_app
from pydoover import config
from pydoover.tags import Tags, Number, Boolean, String, Delta
from pydoover.ui import (
    UI,
    NumericVariable,
    BooleanVariable,
    TextVariable,
    Switch,
    Slider,
    Submodule,
)


class Config(config.Schema):
    # Tank parameters
    tank_capacity = config.Number("Tank Capacity", default=10000, description="Tank capacity in litres")
    sensor_min_ma = config.Number("Sensor Min mA", default=4.0, description="Sensor mA at empty tank")
    sensor_max_ma = config.Number("Sensor Max mA", default=20.0, description="Sensor mA at full tank")

    # Pump control
    pump_start_level = config.Number(
        "Pump Start Level", default=30.0, description="Start pump below this level (%)"
    )
    pump_stop_level = config.Number(
        "Pump Stop Level", default=80.0, description="Stop pump above this level (%)"
    )
    auto_mode = config.Boolean("Auto Mode", default=True, description="Enable automatic pump control")

    # Alarms
    low_level_alarm = config.Number(
        "Low Level Alarm", default=10.0, description="Low level alarm threshold (%)"
    )
    high_level_alarm = config.Number(
        "High Level Alarm", default=95.0, description="High level alarm threshold (%)"
    )

    # Notifications
    notification_topic = config.String(
        "Notification Topic", default="pump_station", description="Notification routing topic"
    )


class StationTags(Tags):
    tank_level = Number(default=0.0, log_on=Delta(amount=5.0))
    pump_running = Boolean(default=False)
    operating_mode = String(default="auto")
    pump_runtime_hours = Number(default=0.0)
    low_level_alarm_active = Boolean(default=False)
    high_level_alarm_active = Boolean(default=False)


class StationUI(UI):
    # Main status display
    tank_level_display = NumericVariable("Tank Level", units="%", precision=1)
    pump_status = BooleanVariable("Pump")
    mode_display = TextVariable("Operating Mode", value="")

    # Operator controls
    pump_switch = Switch("Pump Control")
    setpoint_slider = Slider("Start Level Setpoint", min_val=0, max_val=100)

    # Alarm indicators
    alarm_section = Submodule("Alarms")
    low_alarm = BooleanVariable("Low Level")
    high_alarm = BooleanVariable("High Level")

    # System info
    info_section = Submodule("System")
    runtime_display = NumericVariable("Pump Runtime", units="hrs", precision=1)


class PumpStationApp(Application):
    config_cls = Config
    tags_cls = StationTags
    ui_cls = StationUI

    def setup(self):
        self.loop_target_period = 2  # 2-second control loop

        # Subscribe to command channel for remote control
        self.subscribe("commands", events=["message_create"])

        # Subscribe to pump switch tag for UI interaction
        self.subscribe_to_tag("pump_control", self.on_pump_control)

        # Initialise state
        self.pump_start_time = None
        self.set_tag("operating_mode", "auto" if self.config.auto_mode else "manual")

    def main_loop(self):
        # Read tank level sensor
        raw_ma = self.platform_iface.fetch_ai(0)
        level_pct = self.ma_to_percent(raw_ma)
        self.set_tag("tank_level", round(level_pct, 1))

        # Get current pump state
        pump_on = self.platform_iface.fetch_do(0)
        self.set_tag("pump_running", pump_on)

        # Track pump runtime
        self.update_pump_runtime(pump_on)

        # Run control logic based on operating mode
        mode = self.get_tag("operating_mode")
        if mode == "auto":
            self.auto_control(level_pct, pump_on)

        # Check alarm conditions
        self.check_alarms(level_pct)

        # Publish telemetry
        self.create_message("telemetry", {
            "tank_level": level_pct,
            "pump_running": pump_on,
            "mode": mode,
        })

    def ma_to_percent(self, ma_value):
        """Convert 4-20mA sensor reading to tank level percentage."""
        span = self.config.sensor_max_ma - self.config.sensor_min_ma
        if span == 0:
            return 0.0
        pct = (ma_value - self.config.sensor_min_ma) / span * 100.0
        return max(0.0, min(100.0, pct))

    def auto_control(self, level_pct, pump_on):
        """Automatic pump control with hysteresis."""
        if level_pct < self.config.pump_start_level and not pump_on:
            self.platform_iface.set_do(0, True)
        elif level_pct > self.config.pump_stop_level and pump_on:
            self.platform_iface.set_do(0, False)

    def update_pump_runtime(self, pump_on):
        """Track cumulative pump running hours."""
        import time

        if pump_on:
            if self.pump_start_time is None:
                self.pump_start_time = time.time()
            else:
                elapsed_hours = (time.time() - self.pump_start_time) / 3600
                current = self.get_tag("pump_runtime_hours", default=0.0)
                self.set_tag("pump_runtime_hours", current + elapsed_hours)
                self.pump_start_time = time.time()
        else:
            self.pump_start_time = None

    def check_alarms(self, level_pct):
        """Evaluate alarm conditions and send notifications."""
        # Low level alarm
        low_alarm = level_pct < self.config.low_level_alarm
        prev_low = self.get_tag("low_level_alarm_active", default=False)
        self.set_tag("low_level_alarm_active", low_alarm)

        if low_alarm and not prev_low:
            self.send_notification(
                message=f"Tank level critically low: {level_pct:.1f}%",
                title="Low Level Alarm",
                severity="critical",
                topic=self.config.notification_topic,
            )

        # High level alarm
        high_alarm = level_pct > self.config.high_level_alarm
        prev_high = self.get_tag("high_level_alarm_active", default=False)
        self.set_tag("high_level_alarm_active", high_alarm)

        if high_alarm and not prev_high:
            self.send_notification(
                message=f"Tank level critically high: {level_pct:.1f}%",
                title="High Level Alarm",
                severity="critical",
                topic=self.config.notification_topic,
            )

    def on_message_create(self, event):
        """Handle commands received from the cloud."""
        if event.channel_name != "commands":
            return

        action = event.data.get("action")
        if action == "set_mode":
            new_mode = event.data.get("mode", "auto")
            self.set_tag("operating_mode", new_mode)
        elif action == "pump_on":
            self.platform_iface.set_do(0, True)
            self.set_tag("operating_mode", "manual")
        elif action == "pump_off":
            self.platform_iface.set_do(0, False)
            self.set_tag("operating_mode", "manual")
        elif action == "reset_runtime":
            self.set_tag("pump_runtime_hours", 0.0)

    def on_pump_control(self, new_value):
        """Handle pump switch interaction from the web portal UI."""
        self.platform_iface.set_do(0, new_value)
        self.set_tag("operating_mode", "manual")

    def check_can_shutdown(self):
        """Prevent shutdown while pump is running to avoid water hammer."""
        pump_on = self.platform_iface.fetch_do(0)
        if pump_on:
            # Stop the pump before allowing shutdown
            self.platform_iface.set_do(0, False)
            return False  # Defer shutdown for one cycle to let pump stop
        return True

    def on_shutdown_at(self, dt):
        """Clean shutdown: ensure pump is off and update status."""
        self.platform_iface.set_do(0, False)
        self.set_tag("pump_running", False)
        self.set_tag("operating_mode", "shutdown")


if __name__ == "__main__":
    run_app(PumpStationApp)

Key points:

  • Configuration has multiple sections (tank parameters, pump thresholds, alarm thresholds, notification routing) that all appear as editable fields in the web portal.
  • Tags use auto-logging triggers. The tank_level tag has log_on=Delta(amount=5.0), meaning every 5% change is automatically logged for historical analysis.
  • UI includes display variables (tank level, pump status), interactive controls (Switch for pump control, Slider for setpoint), and organisational containers (alarm and system submodules).
  • Channel subscription on "commands" allows the cloud (or other agents/processors) to send control commands like mode changes and pump start/stop.
  • Tag subscription on "pump_control" reacts to the UI switch interaction in real time.
  • Notifications are sent only on alarm transitions (rising edge), not continuously, to avoid flooding operators with repeated alerts.
  • Shutdown is deferred if the pump is running, preventing sudden pump stops that could cause water hammer. The pump is stopped first, and shutdown proceeds on the next cycle.

Next Steps