Skip to content

Application Class

The Application class is the base class for all pydoover Docker applications. It provides the foundation for building IoT applications that communicate with the Doover cloud and interact with hardware.

Import

from pydoover.docker import Application, run_app

Class Overview

class Application:
    def __init__(
        self,
        config: Schema,
        app_key: str = None,
        is_async: bool = None,
        device_agent: DeviceAgentInterface = None,
        platform_iface: PlatformInterface = None,
        modbus_iface: ModbusInterface = None,
        name: str = None,
        test_mode: bool = False,
        config_fp: str = None,
        healthcheck_port: int = None,
    )

Constructor Parameters

ParameterTypeDefaultDescription
configSchemaRequiredConfiguration schema for the application
app_keystrNoneApplication key (usually set via CLI args)
is_asyncboolNoneForce async mode (auto-detected if not set)
device_agentDeviceAgentInterfaceNoneCustom device agent interface
platform_ifacePlatformInterfaceNoneCustom platform interface
modbus_ifaceModbusInterfaceNoneCustom modbus interface
namestrNoneApplication name (defaults to class name)
test_modeboolFalseEnable test mode for controlled loop execution
config_fpstrNonePath to configuration file
healthcheck_portintNonePort for healthcheck server (default: 49200)

Attributes

Core Attributes

AttributeTypeDescription
configSchemaThe configuration schema instance
device_agentDeviceAgentInterfaceInterface for cloud communication
platform_ifacePlatformInterfaceInterface for hardware I/O
modbus_ifaceModbusInterfaceInterface for Modbus communication
ui_managerUIManagerUser interface manager
app_keystrUnique application identifier
app_display_namestrHuman-readable application name
namestrInternal application name

Loop Control

AttributeTypeDefaultDescription
loop_target_periodfloat1Target seconds between loop iterations
dda_startup_timeoutint300Seconds to wait for Device Agent startup
force_log_on_shutdownboolFalseForce UI log on shutdown

Lifecycle Methods

setup()

Called once after the application initializes and before the main loop starts.

def setup(self):
    """Initialize resources and set up the application."""
    pass

# Or async version
async def setup(self):
    """Async initialization."""
    await self.async_init()

Use setup() for:

  • Initializing state machines
  • Setting up UI elements
  • Registering callbacks
  • Starting background tasks
  • Setting initial tag values

main_loop()

Called continuously at the interval specified by loop_target_period.

def main_loop(self):
    """Main application logic."""
    pass

# Or async version
async def main_loop(self):
    """Async main loop."""
    data = await self.fetch_data()

Use main_loop() for:

  • Reading sensor data
  • Processing state machines
  • Updating UI values
  • Setting tags

Tag Methods

get_tag()

Get a tag value for a specific application.

def get_tag(
    self,
    tag_key: str,
    app_key: str = None,
    default: Any = None
) -> Any | None

Parameters:

  • tag_key: The tag name to fetch
  • app_key: Application key (defaults to current app)
  • default: Default value if tag doesn't exist

Example:

# Get tag from current app
temperature = self.get_tag("temperature")

# Get tag with default
status = self.get_tag("status", default="unknown")

# Get tag from another app
other_value = self.get_tag("sensor_reading", app_key="other-app-1234")

get_global_tag()

Get a global tag value shared across all applications.

def get_global_tag(self, tag_key: str, default: Any = None) -> Any | None

Example:

system_status = self.get_global_tag("system_status")
shutdown_requested = self.get_global_tag("shutdown_requested", default=False)

set_tag()

Set a tag value. This method works in both sync and async contexts.

def set_tag(
    self,
    tag_key: str,
    value: Any,
    app_key: str = None,
    only_if_changed: bool = True
) -> None

Parameters:

  • tag_key: The tag name to set
  • value: The value to set
  • app_key: Application key (defaults to current app)
  • only_if_changed: Only publish if value changed (default: True)

Example:

# Set a tag for current app
self.set_tag("temperature", 25.5)

# Set a tag for another app
self.set_tag("command", "start", app_key="other-app-1234")

# Always publish even if unchanged
self.set_tag("heartbeat", True, only_if_changed=False)

set_tag_async()

Async version of set_tag().

await self.set_tag_async("temperature", 25.5)

set_tags()

Set multiple tags at once.

def set_tags(
    self,
    tags: dict[str, Any],
    app_key: str = None,
    only_if_changed: bool = True
) -> None

Example:

self.set_tags({
    "temperature": 25.5,
    "humidity": 60.0,
    "status": "online"
})

set_global_tag()

Set a global tag shared across all applications.

def set_global_tag(
    self,
    tag_key: str,
    value: Any,
    only_if_changed: bool = True
) -> None

Example:

self.set_global_tag("system_status", "operational")

subscribe_to_tag()

Subscribe to tag updates with a callback.

def subscribe_to_tag(
    self,
    tag_key: str,
    callback: Callable[[str, Any], Any],
    app_key: str = None,
    global_tag: bool = False
)

Example:

def on_temperature_change(tag_key, new_value):
    print(f"Temperature changed to {new_value}")

self.subscribe_to_tag("temperature", on_temperature_change)

Hardware I/O Methods

These methods delegate to the PlatformInterface. See the PlatformInterface documentation for details.

get_di()

Get digital input value.

def get_di(self, di: int) -> bool

Example:

button_pressed = self.get_di(0)

get_ai()

Get analog input value.

def get_ai(self, ai: int) -> float

Example:

voltage = self.get_ai(0)

get_do()

Get digital output value.

def get_do(self, do: int) -> bool

Example:

relay_state = self.get_do(0)

set_do()

Set digital output value.

def set_do(self, do: int, value: bool) -> None

Example:

self.set_do(0, True)  # Turn on relay

schedule_do()

Schedule a digital output change.

def schedule_do(self, do: int, value: bool, delay_secs: int) -> None

Example:

self.schedule_do(0, False, 10)  # Turn off in 10 seconds

get_ao()

Get analog output value.

def get_ao(self, ao: int) -> float

set_ao()

Set analog output value.

def set_ao(self, ao: int, value: float) -> None

Example:

self.set_ao(0, 3.3)  # Set to 3.3V

schedule_ao()

Schedule an analog output change.

def schedule_ao(self, ao: int, value: float, delay_secs: int) -> None

Channel Methods

subscribe_to_channel()

Subscribe to a channel for receiving updates.

def subscribe_to_channel(
    self,
    channel_name: str,
    callback: Callable[[str, dict], Any]
)

Example:

async def on_config_update(channel_name, data):
    print(f"Config updated: {data}")

self.subscribe_to_channel("deployment_config", on_config_update)

publish_to_channel()

Publish data to a channel.

def publish_to_channel(self, channel_name: str, data: str | dict)

Examples:

# Publish a dictionary
self.publish_to_channel("sensor_data", {"temperature": 25.5})

# Publish a string (useful for alerts and notifications)
self.publish_to_channel("significantAlerts", "Temperature threshold exceeded!")

get_channel_aggregate()

Get the current aggregate data for a channel.

def get_channel_aggregate(self, channel_name: str) -> dict | None

UI Methods

set_ui_elements()

Set the UI elements for the application.

def set_ui_elements(self, elements: list)

get_command()

Get a UI command value.

def get_command(self, name: str) -> Any

coerce_command()

Set a UI command to a specific value.

def coerce_command(self, name: str, value: Any)

set_ui_status_icon()

Set the status icon for the UI.

def set_ui_status_icon(self, icon: str)

Modbus Methods

These methods delegate to the ModbusInterface. See the ModbusInterface documentation for details.

read_modbus_registers()

Read registers from a Modbus device.

def read_modbus_registers(
    self,
    address: int,
    count: int,
    register_type: int,
    modbus_id: int = None,
    bus_id: str = None
) -> int | list[int] | None

write_modbus_registers()

Write values to Modbus registers.

def write_modbus_registers(
    self,
    address: int,
    values: list[int],
    register_type: int,
    modbus_id: int = None,
    bus_id: str = None
) -> bool

add_new_modbus_read_subscription()

Add a subscription to periodically read Modbus registers.

def add_new_modbus_read_subscription(
    self,
    address: int,
    count: int,
    register_type: int,
    callback: Callable,
    poll_secs: int = None,
    modbus_id: int = None,
    bus_id: str = None
)

Shutdown Methods

request_shutdown()

Request a system shutdown.

def request_shutdown(self) -> None

on_shutdown_at()

Callback when a shutdown is scheduled. Override this to perform cleanup.

async def on_shutdown_at(self, dt: datetime) -> None:
    """Called when shutdown is scheduled."""
    print(f"Shutdown at {dt}")

check_can_shutdown()

Check if the application can safely shutdown. Override this to add safety checks.

async def check_can_shutdown(self) -> bool:
    """Return True if safe to shutdown."""
    return True

Status Methods

is_ready

Property indicating if the application is ready.

@property
def is_ready(self) -> bool

wait_until_ready()

Wait until the application is ready.

await app.wait_until_ready()

get_is_dda_available()

Check if the Device Agent is available.

def get_is_dda_available(self) -> bool

get_is_dda_online()

Check if the Device Agent is currently online.

def get_is_dda_online(self) -> bool

get_has_dda_been_online()

Check if the Device Agent has been online at least once.

def get_has_dda_been_online(self) -> bool

Testing

Test Mode

Enable test mode to control loop execution manually:

async def test_my_app():
    app = MyApp(config=MyConfig(), test_mode=True)
    asyncio.create_task(run_app(app, start=False))

    # Wait for app to start
    await app.wait_until_ready()

    # Run one iteration of the main loop
    await app.next()

    # Check state
    assert app.get_tag("status") == "ready"

next()

Manually trigger one main loop iteration in test mode.

await app.next()

run_app() Function

The run_app() function initializes and runs an application.

def run_app(
    app: Application,
    start: bool = True,
    setup_logging: bool = True,
    log_formatter: logging.Formatter = None,
    log_filters: logging.Filter | list[logging.Filter] = None
)

Parameters:

  • app: The application instance to run
  • start: If True, runs in blocking mode. If False, returns an async runner
  • setup_logging: If True, configures logging automatically
  • log_formatter: Custom logging formatter
  • log_filters: Custom logging filters

Example:

from pydoover.docker import Application, run_app

class MyApp(Application):
    def setup(self):
        pass

    def main_loop(self):
        pass

if __name__ == "__main__":
    run_app(MyApp(config=MyConfig()))

Real-World Example: Complete Application Structure

This example from a Doover application template demonstrates the recommended pattern of separating concerns into dedicated modules for configuration, UI, and state management:

import time
from pydoover.docker import Application, run_app
from pydoover import ui

# app_config.py - Configuration schema
from pydoover import config

class SampleConfig(config.Schema):
    def __init__(self):
        self.outputs_enabled = config.Boolean("Digital Outputs Enabled", default=True)
        self.funny_message = config.String("A Funny Message")
        self.sim_app_key = config.Application("Simulator App Key")

# app_ui.py - UI elements
class SampleUI:
    def __init__(self):
        self.is_working = ui.BooleanVariable("is_working", "We Working?")
        self.uptime = ui.DateTimeVariable("uptime", "Started")
        self.send_alert = ui.Action("send_alert", "Send message as alert")
        self.test_message = ui.TextParameter("test_message", "Put in a message")
        self.test_output = ui.TextVariable("test_output", "Message received")

    def fetch(self):
        return [self.is_working, self.uptime, self.send_alert,
                self.test_message, self.test_output]

# application.py - Main application
class SampleApplication(Application):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.started = time.time()
        self.ui: SampleUI = None

    async def setup(self):
        self.ui = SampleUI()
        self.ui_manager.add_children(*self.ui.fetch())

    async def main_loop(self):
        self.ui.is_working.update(True)
        self.ui.uptime.update(self.started)

        # Read a tag from another application
        random_value = self.get_tag("random_value", self.config.sim_app_key.value)

    @ui.callback("send_alert")
    async def on_send_alert(self, new_value):
        await self.publish_to_channel("significantAlerts", self.ui.test_output.current_value)
        self.ui.send_alert.coerce(None)

    @ui.callback("test_message")
    async def on_text_parameter_change(self, new_value):
        self.ui.test_output.update(new_value)

# Entry point
def main():
    run_app(SampleApplication(config=SampleConfig()))

Source: app-template