Application Class
The Application class in pydoover.docker is the base class for all Docker applications running on Doovit devices. It provides the lifecycle framework, channel and tag operations, hardware proxies, and integration points for configuration, UI, and notifications.
Import
from pydoover.docker import Application, run_app
Constructor
The constructor is called by run_app() with arguments parsed from the command line. You rarely call it directly except in test mode.
app = Application(
app_key="my_app",
device_agent=device_agent_iface,
platform_iface=platform_iface,
modbus_iface=modbus_iface,
name="My Application",
test_mode=False,
config_fp=None,
healthcheck_port=49200,
)
| Parameter | Type | Description |
|---|---|---|
app_key | str | Unique identifier for this application on the agent. Used to scope tags and configuration. |
device_agent | DeviceAgentInterface | Interface for cloud communication (channels, messages, events). |
platform_iface | PlatformInterface | Interface for hardware I/O (digital/analog, pulse counters, system info). |
modbus_iface | ModbusInterface | Interface for Modbus RTU/TCP communication. |
name | str | Human-readable name for logging and identification. |
test_mode | bool | When True, uses mock interfaces for local testing without hardware. |
config_fp | str or None | Path to a local JSON config file. Overrides cloud-based configuration when set. |
healthcheck_port | int | Port for the HTTP healthcheck server (default 49200). |
Class Attributes
Three class-level attributes define the schema types that the framework instantiates during startup.
from pydoover.docker import Application
from pydoover import config
from pydoover.ui import UI, NumericVariable, Switch
from pydoover.tags import Tags, Number, Boolean
class MyConfig(config.Schema):
poll_interval = config.Number("Poll Interval", default=30, description="Seconds between sensor reads")
device_name = config.String("Device Name", default="sensor-01", description="Display name")
alerts_enabled = config.Boolean("Alerts Enabled", default=True, description="Enable alert notifications")
class MyTags(Tags):
temperature = Number(default=0.0)
pump_active = Boolean(default=False)
class MyUI(UI):
temp_display = NumericVariable("Temperature", units="C", precision=1)
pump_switch = Switch("Pump")
class MyApp(Application):
config_cls = MyConfig
tags_cls = MyTags
ui_cls = MyUI
| Attribute | Type | Description |
|---|---|---|
config_cls | type[Schema] | Configuration schema class. Parsed from deployment config and available as self.config. |
tags_cls | type[Tags] | Tag definitions class. Initialised with defaults and available as self.tags. |
ui_cls | type[UI] | UI element definitions class. Synchronised to the web portal and available as self.ui. |
Instance Attributes
After construction, the following attributes are available on the application instance.
| Attribute | Type | Description |
|---|---|---|
self.config | Schema instance | Parsed configuration values. Access fields as attributes (e.g., self.config.poll_interval). |
self.app_key | str | The application key used for tag scoping and identification. |
self.device_agent | DeviceAgentInterface | Cloud communication interface. |
self.platform_iface | PlatformInterface | Hardware I/O interface. |
self.modbus_iface | ModbusInterface | Modbus communication interface. |
self.tag_manager | TagManager | Internal manager for tag state and commits. |
self.rpc | RPCManager | Remote procedure call handler. |
self.ui_manager | UIManager | Internal manager for UI element synchronisation. |
self.tags | Tags instance | Tag values container (from tags_cls). |
self.ui | UI instance | UI elements container (from ui_cls). |
self.loop_target_period | float | Target seconds between loop iterations (default 1). |
self.test_mode | bool | Whether the application is running in test mode. |
Properties
| Property | Type | Description |
|---|---|---|
self.is_ready | bool | True after setup() has completed and the main loop has started. |
Lifecycle Methods
These are the methods you override in your Application subclass to implement your application logic.
setup()
Called once after configuration is loaded and all interfaces are connected. Use it for one-time initialisation.
def setup(self):
# Subscribe to channels
self.subscribe("commands", events=["message_create"])
# Initialise hardware
self.modbus_iface.open_bus(
bus_type="serial",
name="sensors",
serial_port="/dev/ttyUSB0",
serial_baud=9600,
)
# Set the loop period
self.loop_target_period = self.config.poll_interval
# Log startup
self.set_tag("status", "online")
This example subscribes to a command channel, opens a Modbus serial bus, configures the loop timing from the deployment config, and sets an initial status tag.
main_loop()
Called repeatedly at the rate specified by loop_target_period. Contains the application's core logic.
def main_loop(self):
# Read sensor data
temp = self.platform_iface.fetch_ai(0)
humidity = self.platform_iface.fetch_ai(1)
# Update tags
self.set_tag("temperature", temp)
self.set_tag("humidity", humidity)
# Publish to the cloud
self.create_message("telemetry", {
"temperature": temp,
"humidity": humidity,
})
# Control logic
if temp > self.config.high_temp_threshold:
self.platform_iface.set_do(0, True) # Turn on cooling
if self.config.alerts_enabled:
self.send_notification(
message=f"Temperature {temp}C exceeds threshold",
severity="warning",
topic="high_temp",
)
This reads analog sensors, updates tags and a telemetry channel, and applies control logic based on configuration values. Tags are committed automatically after each iteration.
close()
Called during the final shutdown sequence for resource cleanup. Override this if you need to release resources that the framework does not manage.
def close(self):
self.modbus_iface.close_bus(self.sensor_bus_id)
Channel Operations
subscribe(channel_name, events)
Subscribe to a channel to receive events when data changes. The events parameter specifies which event types to listen for.
self.subscribe("commands", events=["message_create", "aggregate_update"])
Available event types: "message_create", "message_update", "oneshot_message", "aggregate_update", "channel_sync".
Event Handlers
Override these methods to handle events from subscribed channels.
def on_message_create(self, event):
"""Called when a new message is created on a subscribed channel."""
channel_name = event.channel_name
data = event.data
print(f"New message on {channel_name}: {data}")
def on_message_update(self, event):
"""Called when an existing message is updated."""
pass
def on_oneshot_message(self, event):
"""Called when a oneshot message is received (not persisted)."""
pass
def on_aggregate_update(self, event):
"""Called when a channel aggregate is updated."""
pass
def on_channel_sync(self, event):
"""Called when a channel's full state is synchronised."""
pass
Each handler receives an event object with properties for the channel name, data payload, and metadata. Override only the handlers you need.
create_message(channel_name, data, files, timestamp)
Publish a new message to a channel. Returns the message's snowflake ID.
msg_id = self.create_message(
"telemetry",
data={"temperature": 23.5, "humidity": 61.2},
)
The files parameter accepts a dictionary of filename-to-bytes mappings for binary attachments. The timestamp parameter overrides the automatic timestamp if needed.
update_message(channel_name, message_id, data, files)
Update an existing message by its snowflake ID.
self.update_message("telemetry", msg_id, data={"temperature": 24.0})
update_channel_aggregate(channel_name, data)
Directly update a channel's aggregate without creating a message.
self.update_channel_aggregate("status", {"state": "running", "uptime": 3600})
Tag Operations
Tags are persistent key-value pairs scoped to an application. They survive restarts and are synchronised to the cloud. For a full introduction to the tag system, see Tags Overview.
get_tag(key, app_key, default)
Read a tag value. By default, tags are scoped to the current application's app_key.
# Get a tag from this application
temp = self.get_tag("temperature")
# Get a tag from a different application on the same agent
other_temp = self.get_tag("temperature", app_key="other_app")
# Provide a default if the tag does not exist
status = self.get_tag("status", default="unknown")
The app_key parameter lets you read tags owned by other applications on the same agent. This enables cross-application data sharing.
get_global_tag(key, default)
Read a global tag that is not scoped to any application.
firmware_version = self.get_global_tag("firmware_version", default="unknown")
Global tags are shared across all applications on the agent. They are typically set by the platform or system-level processes.
set_tag(key, value, app_key, only_if_changed, log)
Write a tag value. The change is staged and committed at the end of the current loop iteration.
# Set a tag on this application
self.set_tag("temperature", 23.5)
# Only update if the value has actually changed
self.set_tag("status", "online", only_if_changed=True)
# Force a log entry for this tag change
self.set_tag("pump_state", True, log=True)
The only_if_changed parameter avoids unnecessary cloud writes when the value has not changed. The log parameter forces a log entry for the tag change regardless of auto-logging triggers.
set_global_tag(key, value, only_if_changed, log)
Write a global tag (not scoped to any application).
self.set_global_tag("site_name", "Warehouse A")
subscribe_to_tag(key, callback, app_key, global_tag)
Register a callback that fires whenever a tag value changes.
def on_mode_change(new_value):
if new_value == "maintenance":
self.set_do(0, False) # Disable output in maintenance mode
self.subscribe_to_tag("operating_mode", on_mode_change)
# Subscribe to a global tag
self.subscribe_to_tag("firmware_version", on_firmware_change, global_tag=True)
Tag subscriptions are useful for reacting to configuration changes or commands sent from the cloud without polling.
Notifications
send_notification(message, title, severity, topic)
Send a notification to configured endpoints (email, SMS, push, webhook).
self.send_notification(
message="Sensor reading exceeded safe threshold: 85.3C",
title="High Temperature Alert",
severity="warning",
topic="temperature_alerts",
)
| Parameter | Type | Description |
|---|---|---|
message | str | The notification body text. |
title | str | Optional title or subject line. |
severity | str | Severity level (e.g., "info", "warning", "critical"). |
topic | str | Topic string for routing to specific notification subscriptions. |
Notifications are delivered to endpoints configured on the agent. See Notifications for endpoint configuration.
Shutdown Flow
The shutdown system gives applications control over when and how they stop.
request_shutdown()
Programmatically request the application to shut down.
if self.get_tag("decommissioned"):
self.request_shutdown()
check_can_shutdown()
Override to defer shutdown when critical operations are in progress. Return True to allow shutdown, False to defer.
def check_can_shutdown(self):
if self.firmware_update_in_progress:
return False
return True
The framework calls this method when a shutdown is requested. If it returns False, the shutdown is deferred and checked again on subsequent loop iterations.
on_shutdown_at(dt)
Override to perform cleanup before the application exits. The dt parameter is the datetime at which shutdown was scheduled.
def on_shutdown_at(self, dt):
self.set_tag("status", "shutting_down")
self.modbus_iface.close_bus(self.bus_id)
self.platform_iface.set_do(0, False) # Safe state
Platform Proxies
The Application class provides convenience methods that proxy to the PlatformInterface. These save you from writing self.platform_iface.method() repeatedly.
| Method | Description |
|---|---|
self.fetch_di(*pins) | Read digital input(s) |
self.fetch_ai(*pins) | Read analog input(s) |
self.fetch_do(*pins) | Read digital output state(s) |
self.set_do(pin, value) | Set a digital output |
self.schedule_do(pin, value, in_secs) | Schedule a digital output change |
self.fetch_ao(*pins) | Read analog output state(s) |
self.set_ao(pin, value) | Set an analog output |
self.schedule_ao(pin, value, in_secs) | Schedule an analog output change |
These proxy methods make application code more concise when hardware interaction is frequent.
Modbus Proxies
Similarly, these methods proxy to the ModbusInterface.
| Method | Description |
|---|---|
self.read_modbus_registers(...) | Read Modbus registers from a device |
self.write_modbus_registers(...) | Write Modbus registers to a device |
self.add_new_modbus_read_subscription(...) | Add a polling subscription for register reads |
Status Methods
These methods check the state of the Device Agent connection.
| Method | Returns | Description |
|---|---|---|
self.get_is_dda_available() | bool | Whether the Device Agent service is reachable. |
self.get_is_dda_online() | bool | Whether the Device Agent has an active cloud connection. |
self.get_has_dda_been_online() | bool | Whether the Device Agent has connected at least once since startup. |
Applications can continue running their main loop even when the Device Agent is offline. Tags and messages are queued locally and synchronised when connectivity is restored. Use get_is_dda_online() to adjust behaviour during connectivity loss.
Test Mode
Test mode allows you to run and test application logic without hardware or cloud connectivity.
from pydoover.docker import Application
class MyApp(Application):
config_cls = MyConfig
tags_cls = MyTags
def setup(self):
self.set_tag("status", "online")
def main_loop(self):
self.set_tag("counter", self.get_tag("counter", default=0) + 1)
# Create the app in test mode
app = MyApp(app_key="test", test_mode=True)
# Run setup
await app.setup()
# Step through loop iterations manually
await app.next()
assert app.get_tag("counter") == 1
await app.next()
assert app.get_tag("counter") == 2
In test mode, the DeviceAgentInterface is replaced with MockDeviceAgentInterface, which simulates channel and tag operations in memory. This lets you write automated tests that verify application behaviour without any external dependencies.
Complete Example
Here is a minimal but complete Docker application that reads a temperature sensor and publishes data to the cloud.
from pydoover.docker import Application, run_app
from pydoover import config
from pydoover.tags import Tags, Number
from pydoover.ui import UI, NumericVariable
class Config(config.Schema):
poll_interval = config.Number("Poll Interval", default=10, description="Seconds between reads")
temp_threshold = config.Number("Temp Threshold", default=40.0, description="Alert threshold (C)")
class AppTags(Tags):
temperature = Number(default=0.0)
class AppUI(UI):
temp_display = NumericVariable("Temperature", units="C", precision=1)
class TemperatureMonitor(Application):
config_cls = Config
tags_cls = AppTags
ui_cls = AppUI
def setup(self):
self.loop_target_period = self.config.poll_interval
self.set_tag("status", "online")
def main_loop(self):
temp = self.platform_iface.fetch_ai(0)
self.set_tag("temperature", temp)
self.create_message("telemetry", {"temperature": temp})
if temp > self.config.temp_threshold:
self.send_notification(
message=f"Temperature is {temp}C",
severity="warning",
topic="high_temp",
)
def on_shutdown_at(self, dt):
self.set_tag("status", "offline")
if __name__ == "__main__":
run_app(TemperatureMonitor)
This application reads an analog temperature sensor on each loop iteration, updates a tag and telemetry channel, and sends a notification when the temperature exceeds a configurable threshold.
Next Steps
- Device Agent Interface -- cloud communication details
- Platform Interface -- hardware I/O operations
- Modbus Interface -- Modbus communication
- Examples -- more complete example applications