Examples
This page provides three complete Docker application examples, progressing from simple to comprehensive. Each example is a self-contained application that can be deployed to a Doovit device.
Example 1: Simple I/O Control
This application reads digital inputs and controls digital outputs based on configurable rules. It demonstrates the fundamentals: configuration, the main loop, hardware I/O, and tag persistence.
What it does: Monitors a door sensor on digital input 0 and controls an alarm relay on digital output 0. When the door is open for longer than a configurable delay, the alarm activates. The operator can view the door state and alarm state in the web portal.
from pydoover.docker import Application, run_app
from pydoover import config
from pydoover.tags import Tags, Boolean, Number
from pydoover.ui import UI, BooleanVariable, NumericVariable, tag_ref
import time
class Config(config.Schema):
alarm_delay = config.Number(
"Alarm Delay", default=30, description="Seconds door must be open before alarm triggers"
)
alarm_enabled = config.Boolean(
"Alarm Enabled", default=True, description="Enable the door alarm"
)
class AppTags(Tags):
door_open = Boolean(default=False)
alarm_active = Boolean(default=False)
door_open_duration = Number(default=0.0)
class AppUI(UI):
door_status = BooleanVariable("Door", value=AppTags.door_open)
alarm_status = BooleanVariable("Alarm", value=AppTags.alarm_active)
open_duration = NumericVariable("Open Duration", units="s", precision=0)
class DoorAlarmApp(Application):
config_cls = Config
tags_cls = AppTags
ui_cls = AppUI
def setup(self):
self.loop_target_period = 1
self.door_opened_at = None
def main_loop(self):
# Read the door sensor (True = open)
door_open = self.platform_iface.fetch_di(0)
self.set_tag("door_open", door_open)
# Track how long the door has been open
if door_open:
if self.door_opened_at is None:
self.door_opened_at = time.time()
duration = time.time() - self.door_opened_at
self.set_tag("door_open_duration", duration)
else:
self.door_opened_at = None
self.set_tag("door_open_duration", 0.0)
duration = 0.0
# Activate alarm if door has been open too long
should_alarm = (
self.config.alarm_enabled
and door_open
and duration > self.config.alarm_delay
)
self.set_tag("alarm_active", should_alarm)
# Drive the relay
self.platform_iface.set_do(0, should_alarm)
def on_shutdown_at(self, dt):
# Ensure relay is off on shutdown
self.platform_iface.set_do(0, False)
self.set_tag("alarm_active", False)
if __name__ == "__main__":
run_app(DoorAlarmApp)
Key points:
Configdefines two user-editable settings that appear in the web portal.AppTagsprovides persistent state that survives application restarts.AppUIdeclares variables that display live data in the portal.- The
main_loop()runs every second, reading the sensor and applying the alarm logic. - On shutdown, the relay is set to a safe state (off).
Example 2: Modbus Sensor Application
This application reads data from a Modbus RTU device (an energy meter) and publishes readings to the Doover cloud. It demonstrates Modbus bus management, register reading, 32-bit float conversion, and polling subscriptions.
What it does: Connects to an energy meter over Modbus RTU, reads voltage, current, and power registers every 10 seconds, converts the raw register values to engineering units, and publishes the data as tags and channel messages.
from pydoover.docker import Application, run_app
from pydoover.docker.modbus import ModbusConfig, ModbusInterface
from pydoover import config
from pydoover.tags import Tags, Number
from pydoover.ui import UI, NumericVariable
class Config(config.Schema):
modbus = ModbusConfig()
device_address = config.Number("Device Address", default=1, description="Modbus device address (1-247)")
poll_interval = config.Number("Poll Interval", default=10, description="Seconds between reads")
class MeterTags(Tags):
voltage = Number(default=0.0)
current = Number(default=0.0)
power = Number(default=0.0)
energy_total = Number(default=0.0)
class MeterUI(UI):
voltage_display = NumericVariable("Voltage", units="V", precision=1)
current_display = NumericVariable("Current", units="A", precision=2)
power_display = NumericVariable("Power", units="W", precision=0)
energy_display = NumericVariable("Total Energy", units="kWh", precision=1)
class EnergyMeterApp(Application):
config_cls = Config
tags_cls = MeterTags
ui_cls = MeterUI
def setup(self):
self.loop_target_period = self.config.poll_interval
# Open the Modbus serial bus
self.bus_id = self.modbus_iface.open_bus(
bus_type="serial",
name="energy_meter",
serial_port=self.config.modbus.serial_port,
serial_baud=self.config.modbus.serial_baud,
)
self.device_addr = int(self.config.device_address)
self.set_tag("status", "online", log=True)
def read_float_register(self, start_address):
"""Read two consecutive input registers and convert to a 32-bit float."""
regs = self.modbus_iface.read_registers(
bus_id=self.bus_id,
modbus_id=self.device_addr,
start_address=start_address,
num_registers=2,
register_type=3, # Input registers
)
return ModbusInterface.two_words_to_32bit_float(regs[0], regs[1])
def main_loop(self):
# Read measurements from the energy meter
# Register map (device-specific):
# 0-1: Voltage (V)
# 2-3: Current (A)
# 4-5: Active Power (W)
# 6-7: Total Energy (kWh)
voltage = self.read_float_register(0)
current = self.read_float_register(2)
power = self.read_float_register(4)
energy = self.read_float_register(6)
# Update tags
self.set_tag("voltage", round(voltage, 1))
self.set_tag("current", round(current, 2))
self.set_tag("power", round(power, 0))
self.set_tag("energy_total", round(energy, 1))
# Publish telemetry message
self.create_message("energy_telemetry", {
"voltage": voltage,
"current": current,
"power": power,
"energy_total": energy,
})
def on_shutdown_at(self, dt):
self.modbus_iface.close_bus(self.bus_id)
self.set_tag("status", "offline", log=True)
if __name__ == "__main__":
run_app(EnergyMeterApp)
Key points:
ModbusConfig()in the config schema lets operators configure the serial port and baud rate through the web portal without code changes.read_float_register()is a helper that encapsulates the two-step process of reading two 16-bit registers and converting them to a float.- The register map (addresses 0-7) is device-specific. Always refer to the connected device's documentation for correct register addresses and data types.
- The bus is closed cleanly during shutdown.
Using Polling Subscriptions
As an alternative to reading registers in the main loop, you can use polling subscriptions. These are particularly useful when you want the Modbus Service to handle timing independently.
def setup(self):
self.bus_id = self.modbus_iface.open_bus(
bus_type="serial",
name="energy_meter",
serial_port=self.config.modbus.serial_port,
serial_baud=self.config.modbus.serial_baud,
)
# Set up automatic polling every 10 seconds
self.modbus_iface.add_read_register_subscription(
bus_id=self.bus_id,
modbus_id=int(self.config.device_address),
start_address=0,
num_registers=8, # Read all 4 float values at once
register_type=3,
poll_secs=10,
callback=self.on_meter_data,
)
def on_meter_data(self, values):
"""Called automatically every 10 seconds with fresh register data."""
voltage = ModbusInterface.two_words_to_32bit_float(values[0], values[1])
current = ModbusInterface.two_words_to_32bit_float(values[2], values[3])
power = ModbusInterface.two_words_to_32bit_float(values[4], values[5])
energy = ModbusInterface.two_words_to_32bit_float(values[6], values[7])
self.set_tag("voltage", round(voltage, 1))
self.set_tag("current", round(current, 2))
self.set_tag("power", round(power, 0))
self.set_tag("energy_total", round(energy, 1))
The subscription reads all 8 registers in a single transaction and delivers them to the callback. This reduces the number of Modbus transactions and keeps the main loop free for other logic.
Example 3: Full-Featured Application
This application demonstrates a complete real-world pattern: configuration with multiple sections, tags with auto-logging, interactive UI elements, channel subscriptions for receiving cloud commands, and notifications.
What it does: Manages a water pump station. It monitors tank level via an analog sensor, controls a pump via a digital output, accepts remote commands (start/stop) from the web portal, and sends notifications for alarm conditions.
from pydoover.docker import Application, run_app
from pydoover import config
from pydoover.tags import Tags, Number, Boolean, String, Delta
from pydoover.ui import (
UI,
NumericVariable,
BooleanVariable,
TextVariable,
Switch,
Slider,
Submodule,
)
class Config(config.Schema):
# Tank parameters
tank_capacity = config.Number("Tank Capacity", default=10000, description="Tank capacity in litres")
sensor_min_ma = config.Number("Sensor Min mA", default=4.0, description="Sensor mA at empty tank")
sensor_max_ma = config.Number("Sensor Max mA", default=20.0, description="Sensor mA at full tank")
# Pump control
pump_start_level = config.Number(
"Pump Start Level", default=30.0, description="Start pump below this level (%)"
)
pump_stop_level = config.Number(
"Pump Stop Level", default=80.0, description="Stop pump above this level (%)"
)
auto_mode = config.Boolean("Auto Mode", default=True, description="Enable automatic pump control")
# Alarms
low_level_alarm = config.Number(
"Low Level Alarm", default=10.0, description="Low level alarm threshold (%)"
)
high_level_alarm = config.Number(
"High Level Alarm", default=95.0, description="High level alarm threshold (%)"
)
# Notifications
notification_topic = config.String(
"Notification Topic", default="pump_station", description="Notification routing topic"
)
class StationTags(Tags):
tank_level = Number(default=0.0, log_on=Delta(amount=5.0))
pump_running = Boolean(default=False)
operating_mode = String(default="auto")
pump_runtime_hours = Number(default=0.0)
low_level_alarm_active = Boolean(default=False)
high_level_alarm_active = Boolean(default=False)
class StationUI(UI):
# Main status display
tank_level_display = NumericVariable("Tank Level", units="%", precision=1)
pump_status = BooleanVariable("Pump")
mode_display = TextVariable("Operating Mode", value="")
# Operator controls
pump_switch = Switch("Pump Control")
setpoint_slider = Slider("Start Level Setpoint", min_val=0, max_val=100)
# Alarm indicators
alarm_section = Submodule("Alarms")
low_alarm = BooleanVariable("Low Level")
high_alarm = BooleanVariable("High Level")
# System info
info_section = Submodule("System")
runtime_display = NumericVariable("Pump Runtime", units="hrs", precision=1)
class PumpStationApp(Application):
config_cls = Config
tags_cls = StationTags
ui_cls = StationUI
def setup(self):
self.loop_target_period = 2 # 2-second control loop
# Subscribe to command channel for remote control
self.subscribe("commands", events=["message_create"])
# Subscribe to pump switch tag for UI interaction
self.subscribe_to_tag("pump_control", self.on_pump_control)
# Initialise state
self.pump_start_time = None
self.set_tag("operating_mode", "auto" if self.config.auto_mode else "manual")
def main_loop(self):
# Read tank level sensor
raw_ma = self.platform_iface.fetch_ai(0)
level_pct = self.ma_to_percent(raw_ma)
self.set_tag("tank_level", round(level_pct, 1))
# Get current pump state
pump_on = self.platform_iface.fetch_do(0)
self.set_tag("pump_running", pump_on)
# Track pump runtime
self.update_pump_runtime(pump_on)
# Run control logic based on operating mode
mode = self.get_tag("operating_mode")
if mode == "auto":
self.auto_control(level_pct, pump_on)
# Check alarm conditions
self.check_alarms(level_pct)
# Publish telemetry
self.create_message("telemetry", {
"tank_level": level_pct,
"pump_running": pump_on,
"mode": mode,
})
def ma_to_percent(self, ma_value):
"""Convert 4-20mA sensor reading to tank level percentage."""
span = self.config.sensor_max_ma - self.config.sensor_min_ma
if span == 0:
return 0.0
pct = (ma_value - self.config.sensor_min_ma) / span * 100.0
return max(0.0, min(100.0, pct))
def auto_control(self, level_pct, pump_on):
"""Automatic pump control with hysteresis."""
if level_pct < self.config.pump_start_level and not pump_on:
self.platform_iface.set_do(0, True)
elif level_pct > self.config.pump_stop_level and pump_on:
self.platform_iface.set_do(0, False)
def update_pump_runtime(self, pump_on):
"""Track cumulative pump running hours."""
import time
if pump_on:
if self.pump_start_time is None:
self.pump_start_time = time.time()
else:
elapsed_hours = (time.time() - self.pump_start_time) / 3600
current = self.get_tag("pump_runtime_hours", default=0.0)
self.set_tag("pump_runtime_hours", current + elapsed_hours)
self.pump_start_time = time.time()
else:
self.pump_start_time = None
def check_alarms(self, level_pct):
"""Evaluate alarm conditions and send notifications."""
# Low level alarm
low_alarm = level_pct < self.config.low_level_alarm
prev_low = self.get_tag("low_level_alarm_active", default=False)
self.set_tag("low_level_alarm_active", low_alarm)
if low_alarm and not prev_low:
self.send_notification(
message=f"Tank level critically low: {level_pct:.1f}%",
title="Low Level Alarm",
severity="critical",
topic=self.config.notification_topic,
)
# High level alarm
high_alarm = level_pct > self.config.high_level_alarm
prev_high = self.get_tag("high_level_alarm_active", default=False)
self.set_tag("high_level_alarm_active", high_alarm)
if high_alarm and not prev_high:
self.send_notification(
message=f"Tank level critically high: {level_pct:.1f}%",
title="High Level Alarm",
severity="critical",
topic=self.config.notification_topic,
)
def on_message_create(self, event):
"""Handle commands received from the cloud."""
if event.channel_name != "commands":
return
action = event.data.get("action")
if action == "set_mode":
new_mode = event.data.get("mode", "auto")
self.set_tag("operating_mode", new_mode)
elif action == "pump_on":
self.platform_iface.set_do(0, True)
self.set_tag("operating_mode", "manual")
elif action == "pump_off":
self.platform_iface.set_do(0, False)
self.set_tag("operating_mode", "manual")
elif action == "reset_runtime":
self.set_tag("pump_runtime_hours", 0.0)
def on_pump_control(self, new_value):
"""Handle pump switch interaction from the web portal UI."""
self.platform_iface.set_do(0, new_value)
self.set_tag("operating_mode", "manual")
def check_can_shutdown(self):
"""Prevent shutdown while pump is running to avoid water hammer."""
pump_on = self.platform_iface.fetch_do(0)
if pump_on:
# Stop the pump before allowing shutdown
self.platform_iface.set_do(0, False)
return False # Defer shutdown for one cycle to let pump stop
return True
def on_shutdown_at(self, dt):
"""Clean shutdown: ensure pump is off and update status."""
self.platform_iface.set_do(0, False)
self.set_tag("pump_running", False)
self.set_tag("operating_mode", "shutdown")
if __name__ == "__main__":
run_app(PumpStationApp)
Key points:
- Configuration has multiple sections (tank parameters, pump thresholds, alarm thresholds, notification routing) that all appear as editable fields in the web portal.
- Tags use auto-logging triggers. The
tank_leveltag haslog_on=Delta(amount=5.0), meaning every 5% change is automatically logged for historical analysis. - UI includes display variables (tank level, pump status), interactive controls (
Switchfor pump control,Sliderfor setpoint), and organisational containers (alarm and system submodules). - Channel subscription on
"commands"allows the cloud (or other agents/processors) to send control commands like mode changes and pump start/stop. - Tag subscription on
"pump_control"reacts to the UI switch interaction in real time. - Notifications are sent only on alarm transitions (rising edge), not continuously, to avoid flooding operators with repeated alerts.
- Shutdown is deferred if the pump is running, preventing sudden pump stops that could cause water hammer. The pump is stopped first, and shutdown proceeds on the next cycle.
Next Steps
- Docker Applications Overview -- framework introduction
- Application Class -- full API reference
- Configuration Overview -- configuration schema system
- Tags Overview -- tag definitions and auto-logging
- UI Overview -- UI element types and bindings