Skip to content

Application Class

The Application class in pydoover.docker is the base class for all Docker applications running on Doovit devices. It provides the lifecycle framework, channel and tag operations, hardware proxies, and integration points for configuration, UI, and notifications.

Import

from pydoover.docker import Application, run_app

Constructor

The constructor is called by run_app() with arguments parsed from the command line. You rarely call it directly except in test mode.

app = Application(
    app_key="my_app",
    device_agent=device_agent_iface,
    platform_iface=platform_iface,
    modbus_iface=modbus_iface,
    name="My Application",
    test_mode=False,
    config_fp=None,
    healthcheck_port=49200,
)
ParameterTypeDescription
app_keystrUnique identifier for this application on the agent. Used to scope tags and configuration.
device_agentDeviceAgentInterfaceInterface for cloud communication (channels, messages, events).
platform_ifacePlatformInterfaceInterface for hardware I/O (digital/analog, pulse counters, system info).
modbus_ifaceModbusInterfaceInterface for Modbus RTU/TCP communication.
namestrHuman-readable name for logging and identification.
test_modeboolWhen True, uses mock interfaces for local testing without hardware.
config_fpstr or NonePath to a local JSON config file. Overrides cloud-based configuration when set.
healthcheck_portintPort for the HTTP healthcheck server (default 49200).

Class Attributes

Three class-level attributes define the schema types that the framework instantiates during startup.

from pydoover.docker import Application
from pydoover import config
from pydoover.ui import UI, NumericVariable, Switch
from pydoover.tags import Tags, Number, Boolean

class MyConfig(config.Schema):
    poll_interval = config.Number("Poll Interval", default=30, description="Seconds between sensor reads")
    device_name = config.String("Device Name", default="sensor-01", description="Display name")
    alerts_enabled = config.Boolean("Alerts Enabled", default=True, description="Enable alert notifications")

class MyTags(Tags):
    temperature = Number(default=0.0)
    pump_active = Boolean(default=False)

class MyUI(UI):
    temp_display = NumericVariable("Temperature", units="C", precision=1)
    pump_switch = Switch("Pump")

class MyApp(Application):
    config_cls = MyConfig
    tags_cls = MyTags
    ui_cls = MyUI
AttributeTypeDescription
config_clstype[Schema]Configuration schema class. Parsed from deployment config and available as self.config.
tags_clstype[Tags]Tag definitions class. Initialised with defaults and available as self.tags.
ui_clstype[UI]UI element definitions class. Synchronised to the web portal and available as self.ui.

Instance Attributes

After construction, the following attributes are available on the application instance.

AttributeTypeDescription
self.configSchema instanceParsed configuration values. Access fields as attributes (e.g., self.config.poll_interval).
self.app_keystrThe application key used for tag scoping and identification.
self.device_agentDeviceAgentInterfaceCloud communication interface.
self.platform_ifacePlatformInterfaceHardware I/O interface.
self.modbus_ifaceModbusInterfaceModbus communication interface.
self.tag_managerTagManagerInternal manager for tag state and commits.
self.rpcRPCManagerRemote procedure call handler.
self.ui_managerUIManagerInternal manager for UI element synchronisation.
self.tagsTags instanceTag values container (from tags_cls).
self.uiUI instanceUI elements container (from ui_cls).
self.loop_target_periodfloatTarget seconds between loop iterations (default 1).
self.test_modeboolWhether the application is running in test mode.

Properties

PropertyTypeDescription
self.is_readyboolTrue after setup() has completed and the main loop has started.

Lifecycle Methods

These are the methods you override in your Application subclass to implement your application logic.

setup()

Called once after configuration is loaded and all interfaces are connected. Use it for one-time initialisation.

def setup(self):
    # Subscribe to channels
    self.subscribe("commands", events=["message_create"])

    # Initialise hardware
    self.modbus_iface.open_bus(
        bus_type="serial",
        name="sensors",
        serial_port="/dev/ttyUSB0",
        serial_baud=9600,
    )

    # Set the loop period
    self.loop_target_period = self.config.poll_interval

    # Log startup
    self.set_tag("status", "online")

This example subscribes to a command channel, opens a Modbus serial bus, configures the loop timing from the deployment config, and sets an initial status tag.

main_loop()

Called repeatedly at the rate specified by loop_target_period. Contains the application's core logic.

def main_loop(self):
    # Read sensor data
    temp = self.platform_iface.fetch_ai(0)
    humidity = self.platform_iface.fetch_ai(1)

    # Update tags
    self.set_tag("temperature", temp)
    self.set_tag("humidity", humidity)

    # Publish to the cloud
    self.create_message("telemetry", {
        "temperature": temp,
        "humidity": humidity,
    })

    # Control logic
    if temp > self.config.high_temp_threshold:
        self.platform_iface.set_do(0, True)  # Turn on cooling
        if self.config.alerts_enabled:
            self.send_notification(
                message=f"Temperature {temp}C exceeds threshold",
                severity="warning",
                topic="high_temp",
            )

This reads analog sensors, updates tags and a telemetry channel, and applies control logic based on configuration values. Tags are committed automatically after each iteration.

close()

Called during the final shutdown sequence for resource cleanup. Override this if you need to release resources that the framework does not manage.

def close(self):
    self.modbus_iface.close_bus(self.sensor_bus_id)

Channel Operations

subscribe(channel_name, events)

Subscribe to a channel to receive events when data changes. The events parameter specifies which event types to listen for.

self.subscribe("commands", events=["message_create", "aggregate_update"])

Available event types: "message_create", "message_update", "oneshot_message", "aggregate_update", "channel_sync".

Event Handlers

Override these methods to handle events from subscribed channels.

def on_message_create(self, event):
    """Called when a new message is created on a subscribed channel."""
    channel_name = event.channel_name
    data = event.data
    print(f"New message on {channel_name}: {data}")

def on_message_update(self, event):
    """Called when an existing message is updated."""
    pass

def on_oneshot_message(self, event):
    """Called when a oneshot message is received (not persisted)."""
    pass

def on_aggregate_update(self, event):
    """Called when a channel aggregate is updated."""
    pass

def on_channel_sync(self, event):
    """Called when a channel's full state is synchronised."""
    pass

Each handler receives an event object with properties for the channel name, data payload, and metadata. Override only the handlers you need.

create_message(channel_name, data, files, timestamp)

Publish a new message to a channel. Returns the message's snowflake ID.

msg_id = self.create_message(
    "telemetry",
    data={"temperature": 23.5, "humidity": 61.2},
)

The files parameter accepts a dictionary of filename-to-bytes mappings for binary attachments. The timestamp parameter overrides the automatic timestamp if needed.

update_message(channel_name, message_id, data, files)

Update an existing message by its snowflake ID.

self.update_message("telemetry", msg_id, data={"temperature": 24.0})

update_channel_aggregate(channel_name, data)

Directly update a channel's aggregate without creating a message.

self.update_channel_aggregate("status", {"state": "running", "uptime": 3600})

Tag Operations

Tags are persistent key-value pairs scoped to an application. They survive restarts and are synchronised to the cloud. For a full introduction to the tag system, see Tags Overview.

get_tag(key, app_key, default)

Read a tag value. By default, tags are scoped to the current application's app_key.

# Get a tag from this application
temp = self.get_tag("temperature")

# Get a tag from a different application on the same agent
other_temp = self.get_tag("temperature", app_key="other_app")

# Provide a default if the tag does not exist
status = self.get_tag("status", default="unknown")

The app_key parameter lets you read tags owned by other applications on the same agent. This enables cross-application data sharing.

get_global_tag(key, default)

Read a global tag that is not scoped to any application.

firmware_version = self.get_global_tag("firmware_version", default="unknown")

Global tags are shared across all applications on the agent. They are typically set by the platform or system-level processes.

set_tag(key, value, app_key, only_if_changed, log)

Write a tag value. The change is staged and committed at the end of the current loop iteration.

# Set a tag on this application
self.set_tag("temperature", 23.5)

# Only update if the value has actually changed
self.set_tag("status", "online", only_if_changed=True)

# Force a log entry for this tag change
self.set_tag("pump_state", True, log=True)

The only_if_changed parameter avoids unnecessary cloud writes when the value has not changed. The log parameter forces a log entry for the tag change regardless of auto-logging triggers.

set_global_tag(key, value, only_if_changed, log)

Write a global tag (not scoped to any application).

self.set_global_tag("site_name", "Warehouse A")

subscribe_to_tag(key, callback, app_key, global_tag)

Register a callback that fires whenever a tag value changes.

def on_mode_change(new_value):
    if new_value == "maintenance":
        self.set_do(0, False)  # Disable output in maintenance mode

self.subscribe_to_tag("operating_mode", on_mode_change)

# Subscribe to a global tag
self.subscribe_to_tag("firmware_version", on_firmware_change, global_tag=True)

Tag subscriptions are useful for reacting to configuration changes or commands sent from the cloud without polling.

Notifications

send_notification(message, title, severity, topic)

Send a notification to configured endpoints (email, SMS, push, webhook).

self.send_notification(
    message="Sensor reading exceeded safe threshold: 85.3C",
    title="High Temperature Alert",
    severity="warning",
    topic="temperature_alerts",
)
ParameterTypeDescription
messagestrThe notification body text.
titlestrOptional title or subject line.
severitystrSeverity level (e.g., "info", "warning", "critical").
topicstrTopic string for routing to specific notification subscriptions.

Notifications are delivered to endpoints configured on the agent. See Notifications for endpoint configuration.

Shutdown Flow

The shutdown system gives applications control over when and how they stop.

request_shutdown()

Programmatically request the application to shut down.

if self.get_tag("decommissioned"):
    self.request_shutdown()

check_can_shutdown()

Override to defer shutdown when critical operations are in progress. Return True to allow shutdown, False to defer.

def check_can_shutdown(self):
    if self.firmware_update_in_progress:
        return False
    return True

The framework calls this method when a shutdown is requested. If it returns False, the shutdown is deferred and checked again on subsequent loop iterations.

on_shutdown_at(dt)

Override to perform cleanup before the application exits. The dt parameter is the datetime at which shutdown was scheduled.

def on_shutdown_at(self, dt):
    self.set_tag("status", "shutting_down")
    self.modbus_iface.close_bus(self.bus_id)
    self.platform_iface.set_do(0, False)  # Safe state

Platform Proxies

The Application class provides convenience methods that proxy to the PlatformInterface. These save you from writing self.platform_iface.method() repeatedly.

MethodDescription
self.fetch_di(*pins)Read digital input(s)
self.fetch_ai(*pins)Read analog input(s)
self.fetch_do(*pins)Read digital output state(s)
self.set_do(pin, value)Set a digital output
self.schedule_do(pin, value, in_secs)Schedule a digital output change
self.fetch_ao(*pins)Read analog output state(s)
self.set_ao(pin, value)Set an analog output
self.schedule_ao(pin, value, in_secs)Schedule an analog output change

These proxy methods make application code more concise when hardware interaction is frequent.

Modbus Proxies

Similarly, these methods proxy to the ModbusInterface.

MethodDescription
self.read_modbus_registers(...)Read Modbus registers from a device
self.write_modbus_registers(...)Write Modbus registers to a device
self.add_new_modbus_read_subscription(...)Add a polling subscription for register reads

Status Methods

These methods check the state of the Device Agent connection.

MethodReturnsDescription
self.get_is_dda_available()boolWhether the Device Agent service is reachable.
self.get_is_dda_online()boolWhether the Device Agent has an active cloud connection.
self.get_has_dda_been_online()boolWhether the Device Agent has connected at least once since startup.
Information Circle
Offline Operation

Applications can continue running their main loop even when the Device Agent is offline. Tags and messages are queued locally and synchronised when connectivity is restored. Use get_is_dda_online() to adjust behaviour during connectivity loss.

Test Mode

Test mode allows you to run and test application logic without hardware or cloud connectivity.

from pydoover.docker import Application

class MyApp(Application):
    config_cls = MyConfig
    tags_cls = MyTags

    def setup(self):
        self.set_tag("status", "online")

    def main_loop(self):
        self.set_tag("counter", self.get_tag("counter", default=0) + 1)

# Create the app in test mode
app = MyApp(app_key="test", test_mode=True)

# Run setup
await app.setup()

# Step through loop iterations manually
await app.next()
assert app.get_tag("counter") == 1

await app.next()
assert app.get_tag("counter") == 2

In test mode, the DeviceAgentInterface is replaced with MockDeviceAgentInterface, which simulates channel and tag operations in memory. This lets you write automated tests that verify application behaviour without any external dependencies.

Complete Example

Here is a minimal but complete Docker application that reads a temperature sensor and publishes data to the cloud.

from pydoover.docker import Application, run_app
from pydoover import config
from pydoover.tags import Tags, Number
from pydoover.ui import UI, NumericVariable

class Config(config.Schema):
    poll_interval = config.Number("Poll Interval", default=10, description="Seconds between reads")
    temp_threshold = config.Number("Temp Threshold", default=40.0, description="Alert threshold (C)")

class AppTags(Tags):
    temperature = Number(default=0.0)

class AppUI(UI):
    temp_display = NumericVariable("Temperature", units="C", precision=1)

class TemperatureMonitor(Application):
    config_cls = Config
    tags_cls = AppTags
    ui_cls = AppUI

    def setup(self):
        self.loop_target_period = self.config.poll_interval
        self.set_tag("status", "online")

    def main_loop(self):
        temp = self.platform_iface.fetch_ai(0)
        self.set_tag("temperature", temp)

        self.create_message("telemetry", {"temperature": temp})

        if temp > self.config.temp_threshold:
            self.send_notification(
                message=f"Temperature is {temp}C",
                severity="warning",
                topic="high_temp",
            )

    def on_shutdown_at(self, dt):
        self.set_tag("status", "offline")

if __name__ == "__main__":
    run_app(TemperatureMonitor)

This application reads an analog temperature sensor on each loop iteration, updates a tag and telemetry channel, and sends a notification when the temperature exceeds a configurable threshold.

Next Steps