Skip to content

DeviceAgentInterface

The DeviceAgentInterface provides communication with the Doover Device Agent (DDA), enabling your application to interact with the Doover cloud, subscribe to channels, and publish data.

Import

from pydoover.docker import DeviceAgentInterface

Overview

The Device Agent Interface uses gRPC to communicate with the Device Agent container, which maintains a persistent connection to the Doover cloud. This allows applications to:

  • Subscribe to cloud channels and receive real-time updates
  • Publish data to channels
  • Sync state between devices and the cloud
  • Get temporary API tokens for direct cloud access

Class Definition

class DeviceAgentInterface:
    def __init__(
        self,
        app_key: str,
        dda_uri: str = "127.0.0.1:50051",
        is_async: bool = None,
        dda_timeout: int = 7,
        max_conn_attempts: int = 5,
        time_between_connection_attempts: int = 10,
    )

Constructor Parameters

ParameterTypeDefaultDescription
app_keystrRequiredUnique application identifier
dda_uristr"127.0.0.1:50051"URI for the Device Agent gRPC service
is_asyncboolNoneForce async mode
dda_timeoutint7Timeout for requests in seconds
max_conn_attemptsint5Maximum connection retry attempts
time_between_connection_attemptsint10Seconds between retry attempts

Attributes

AttributeTypeDescription
is_dda_availableboolWhether DDA is available
is_dda_onlineboolWhether DDA is currently connected to cloud
has_dda_been_onlineboolWhether DDA has connected at least once
agent_idstrThe agent ID from deployment config
last_channel_message_tsdictTimestamp of last message per channel

Status Methods

get_is_dda_available()

Check if the Device Agent service is available.

def get_is_dda_available(self) -> bool

Example:

if self.device_agent.get_is_dda_available():
    print("Device Agent is available")

get_is_dda_online()

Check if the Device Agent is currently connected to the cloud.

def get_is_dda_online(self) -> bool

Example:

if self.device_agent.get_is_dda_online():
    print("Connected to cloud")
else:
    print("Offline - data will be queued")

get_has_dda_been_online()

Check if the Device Agent has been online at least once since startup.

def get_has_dda_been_online(self) -> bool

await_dda_available()

Wait for the Device Agent to become available.

def await_dda_available(self, timeout: int = 10) -> bool

Example:

# Wait up to 30 seconds for DDA
if self.device_agent.await_dda_available(timeout=30):
    print("DDA is ready")
else:
    print("DDA did not become available")

await_dda_available_async()

Async version of await_dda_available.

await self.device_agent.await_dda_available_async(timeout=30)

test_comms()

Test connection to the Device Agent by sending an echo message.

def test_comms(self, message: str = "Comms Check Message") -> str | None

Example:

response = self.device_agent.test_comms("Hello DDA")
if response:
    print(f"DDA responded: {response}")

Channel Subscription

add_subscription()

Subscribe to a channel to receive updates. The callback is invoked whenever new data arrives on the channel.

def add_subscription(
    self,
    channel_name: str,
    callback: Callable[[str, dict | str], Any]
) -> None

Parameters:

  • channel_name: Name of the channel to subscribe to
  • callback: Function called with (channel_name, aggregate_data) when updates arrive

Example:

async def on_config_update(channel_name, data):
    print(f"Received update on {channel_name}")
    print(f"Data: {data}")

# Subscribe to deployment config updates
self.device_agent.add_subscription("deployment_config", on_config_update)

# Subscribe to custom channel
self.device_agent.add_subscription("sensor_commands", on_sensor_command)

The callback can be a regular function or an async function:

# Sync callback
def handle_update(channel_name, data):
    process_data(data)

# Async callback
async def handle_update_async(channel_name, data):
    await process_data_async(data)

is_channel_synced()

Check if a channel has received its initial sync from the Device Agent.

def is_channel_synced(self, channel_name: str) -> bool

Example:

if self.device_agent.is_channel_synced("deployment_config"):
    print("Config is synced")

wait_for_channels_sync_async()

Wait for multiple channels to be synced.

async def wait_for_channels_sync_async(
    self,
    channel_names: list[str],
    timeout: int = 5,
    inter_wait: float = 0.2
) -> bool

Example:

synced = await self.device_agent.wait_for_channels_sync_async(
    ["deployment_config", "tag_values"],
    timeout=10
)
if synced:
    print("All channels synced")

Channel Publishing

publish_to_channel()

Publish data to a channel. This method works in both sync and async contexts.

def publish_to_channel(
    self,
    channel_name: str,
    message: dict | str,
    record_log: bool = True,
    max_age: int = None
) -> bool

Parameters:

  • channel_name: Name of the channel to publish to
  • message: Data to publish (dict or string)
  • record_log: Whether to save to the log (default: True)
  • max_age: Maximum seconds before publishing to cloud. Use -1 to publish immediately

Returns: True if publishing was successful, False otherwise

Example:

# Publish sensor data
result = self.device_agent.publish_to_channel(
    "sensor_data",
    {"temperature": 25.5, "humidity": 60}
)

# Publish immediately (bypass bundling)
self.device_agent.publish_to_channel(
    "alerts",
    {"type": "critical", "message": "High temperature"},
    max_age=-1
)

# Don't record to log
self.device_agent.publish_to_channel(
    "telemetry",
    {"voltage": 12.3},
    record_log=False
)

publish_to_channel_async()

Async version of publish_to_channel.

await self.device_agent.publish_to_channel_async(
    "sensor_data",
    {"temperature": 25.5}
)

Channel Data Retrieval

get_channel_aggregate()

Get the current aggregate data for a channel.

def get_channel_aggregate(self, channel_name: str) -> dict | str | None

Example:

config = self.device_agent.get_channel_aggregate("deployment_config")
if config:
    print(f"Current config: {config}")

get_channel_aggregate_async()

Async version of get_channel_aggregate.

config = await self.device_agent.get_channel_aggregate_async("deployment_config")

Token Management

get_temp_token()

Get a temporary API token for direct cloud access.

def get_temp_token(self) -> tuple[str, datetime, str] | None

Returns: Tuple of (token, expire_time, endpoint) or None if failed

Example:

result = self.device_agent.get_temp_token()
if result:
    token, expires, endpoint = result
    print(f"Token: {token}")
    print(f"Expires: {expires}")
    print(f"Endpoint: {endpoint}")

get_temp_token_async()

Async version of get_temp_token.

result = await self.device_agent.get_temp_token_async()

Channel Listening (Debug)

listen_channel()

Listen to a channel and print updates to the console. Useful for debugging.

def listen_channel(self, channel_name: str) -> None

Example:

# This will continuously print updates to stdout
self.device_agent.listen_channel("sensor_data")

Common Channels

The Doover platform uses several standard channels:

Channel NameDescription
deployment_configApplication deployment configuration
tag_valuesTag values shared between applications
ui_stateUser interface state
cmdsCommands from the cloud

Usage Example

Here is a complete example showing common DeviceAgentInterface operations:

from pydoover.docker import Application, run_app
from pydoover.config import Schema

class CloudApp(Application):
    def setup(self):
        # Check DDA status
        if not self.device_agent.get_is_dda_available():
            print("Warning: DDA not available yet")

        # Subscribe to commands
        self.device_agent.add_subscription("commands", self.on_command)

        # Subscribe to config updates
        self.device_agent.add_subscription(
            "deployment_config",
            self.on_config_update
        )

    async def on_command(self, channel_name, data):
        """Handle incoming commands from the cloud."""
        command = data.get("command")
        if command == "restart":
            print("Restart requested")
        elif command == "update_config":
            print(f"Config update: {data.get('config')}")

    async def on_config_update(self, channel_name, data):
        """Handle configuration updates."""
        print(f"Configuration updated: {data}")

    def main_loop(self):
        # Publish sensor data
        self.device_agent.publish_to_channel("sensor_data", {
            "temperature": self.read_temperature(),
            "humidity": self.read_humidity(),
            "timestamp": time.time()
        })

        # Check cloud status
        if self.device_agent.get_is_dda_online():
            print("Cloud connected")
        else:
            print("Cloud disconnected - data queued")

    def read_temperature(self):
        return 25.5

    def read_humidity(self):
        return 60.0

if __name__ == "__main__":
    run_app(CloudApp(config=Schema()))

MockDeviceAgentInterface

For testing, pydoover provides a mock implementation:

from pydoover.docker import MockDeviceAgentInterface

# Create mock interface
mock_dda = MockDeviceAgentInterface("test-app")

# Set channel data for testing
mock_dda.channels["deployment_config"] = {"key": "value"}

# Use in application
app = MyApp(
    config=MyConfig(),
    device_agent=mock_dda,
    test_mode=True
)

The mock interface:

  • Always reports as available and online
  • Stores published data in channels dict
  • Does not require a real Device Agent service