Skip to content

Device Agent Interface

The DeviceAgentInterface is the primary communication bridge between a Docker application and the Doover cloud. It connects to the Device Agent (DDA) service running on the Doovit via gRPC and provides methods for channel operations, event subscriptions, message handling, and health monitoring.

In most applications, you do not create this interface directly. The run_app() entry point constructs it and passes it to your Application subclass. However, understanding its API is important for advanced use cases and testing.

Import

from pydoover.docker import DeviceAgentInterface

Constructor

dda = DeviceAgentInterface(
    app_key="my_app",
    dda_uri="127.0.0.1:50051",
    dda_timeout=7,
    max_conn_attempts=5,
    time_between_connection_attempts=10,
)
ParameterTypeDefaultDescription
app_keystr(required)Identifies this application for tag scoping and event routing.
dda_uristr"127.0.0.1:50051"gRPC address of the Device Agent service.
dda_timeoutint7Timeout in seconds for individual gRPC calls.
max_conn_attemptsint5Maximum number of connection attempts before giving up.
time_between_connection_attemptsint10Seconds to wait between connection retry attempts.
Information Circle
Default Address

The default dda_uri of 127.0.0.1:50051 assumes the Device Agent runs as a sidecar container on the same host network. This is the standard configuration on Doovit devices.

Status Properties

These properties reflect the current connection state of the Device Agent.

PropertyTypeDescription
is_dda_availableboolTrue if the DDA service is reachable via gRPC.
is_dda_onlineboolTrue if the DDA has an active connection to the Doover cloud.
has_dda_been_onlineboolTrue if the DDA has successfully connected at least once since the application started.
agent_idstrThe UUID of the agent this application is running on.
if not self.device_agent.is_dda_online:
    # Adjust behaviour during connectivity loss
    self.set_tag("cloud_status", "offline")

This checks whether the Device Agent currently has a cloud connection. The application can continue operating offline; tags and messages are queued locally until the connection is restored.

Health Checking

wait_until_healthy(timeout)

Blocks until the Device Agent service is running and responsive. This is called automatically during application startup, but you can call it manually if needed.

await self.device_agent.wait_until_healthy(timeout=300)
ParameterTypeDefaultDescription
timeoutint300Maximum seconds to wait before raising a timeout error.

The method polls the DDA's health endpoint at regular intervals. If the DDA does not become healthy within the timeout, the call raises an error.

health_check()

Performs a single health check against the DDA service and returns immediately.

is_healthy = await self.device_agent.health_check()

Returns True if the DDA responded to the health check, False otherwise.

Event Subscriptions

Event subscriptions let your application react to data changes on specific channels. When a subscribed event occurs, the DDA delivers it to your registered callback.

add_event_callback(channel_name, callback, events)

Register a callback function for events on a specific channel.

async def handle_command(event):
    command = event.data.get("action")
    if command == "restart":
        self.request_shutdown()

self.device_agent.add_event_callback(
    channel_name="commands",
    callback=handle_command,
    events=["message_create"],
)
ParameterTypeDescription
channel_namestrThe name of the channel to subscribe to.
callbackcallableAsync function to call when an event occurs.
eventslist[str]List of event types to listen for.

Available event types:

Event TypeDescription
"message_create"A new message was published to the channel.
"message_update"An existing message on the channel was modified.
"oneshot_message"A transient message was received (not persisted in history).
"aggregate_update"The channel's aggregate was updated.
"channel_sync"The channel's full state was synchronised from the cloud.
Information Circle
Application-Level Subscription

In most cases, you should use self.subscribe() on the Application class rather than calling add_event_callback() directly. The Application.subscribe() method routes events to the appropriate handler methods (on_message_create, on_aggregate_update, etc.) and handles the wiring for you.

wait_for_channels_sync(channel_names, timeout, inter_wait)

Wait until the specified channels have been synchronised from the cloud. This is useful during startup when you need channel data to be available before proceeding.

await self.device_agent.wait_for_channels_sync(
    channel_names=["config", "commands", "tag_values"],
    timeout=60,
    inter_wait=2,
)
ParameterTypeDefaultDescription
channel_nameslist[str](required)Channels to wait for.
timeoutint60Maximum seconds to wait for all channels.
inter_waitint2Seconds between polling attempts.

The method blocks until all named channels have received their initial sync from the cloud, or until the timeout expires. This ensures your application has up-to-date data before starting its main loop.

Channel Operations

fetch_channel_aggregate(channel_name)

Fetch the current aggregate (consolidated state) for a channel.

aggregate = await self.device_agent.fetch_channel_aggregate("telemetry")
current_temp = aggregate.get("temperature")

After subscribing to a channel, its aggregate is cached in memory and updated automatically as events arrive. The first call fetches from the DDA; subsequent calls return the cached value until an update event arrives.

fetch_message(channel_name, message_id)

Fetch a specific message by its snowflake ID.

msg = await self.device_agent.fetch_message("commands", message_id=123456789)
print(msg.data)

list_messages(channel_name)

List messages in a channel, ordered by snowflake ID (creation time).

messages = await self.device_agent.list_messages("telemetry")
for msg in messages:
    print(f"{msg.id}: {msg.data}")

create_message(channel_name, data, files, timestamp)

Publish a new message to a channel. Returns the message's snowflake ID.

msg_id = await self.device_agent.create_message(
    channel_name="telemetry",
    data={"temperature": 23.5, "humidity": 61.2},
)
ParameterTypeDescription
channel_namestrTarget channel name.
datadictJSON-serialisable payload.
filesdict or NoneOptional dictionary of {filename: bytes} for attachments.
timestampdatetime or NoneOptional timestamp override (uses current time if not set).

update_message(channel_name, message_id, data, files)

Update an existing message's data or attachments.

await self.device_agent.update_message(
    channel_name="telemetry",
    message_id=msg_id,
    data={"temperature": 24.0},
)

send_oneshot_message(channel_name, data)

Send a transient message that is delivered to subscribers but not persisted in the channel's message history.

await self.device_agent.send_oneshot_message(
    channel_name="heartbeat",
    data={"timestamp": "2025-01-15T10:30:00Z"},
)

Oneshot messages are useful for ephemeral signals (heartbeats, acknowledgements) that do not need to be stored.

update_channel_aggregate(channel_name, data)

Directly update a channel's aggregate without creating a message.

await self.device_agent.update_channel_aggregate(
    channel_name="status",
    data={"state": "running", "uptime_seconds": 86400},
)

fetch_message_attachment(channel_name, message_id, filename)

Download a file attachment from a message.

file_bytes = await self.device_agent.fetch_message_attachment(
    channel_name="firmware",
    message_id=msg_id,
    filename="update.bin",
)

Aggregate Caching

When you subscribe to a channel's aggregate_update events, the DeviceAgentInterface caches the channel's aggregate in memory. Subsequent calls to fetch_channel_aggregate() return the cached value instantly without a gRPC call.

The cache is updated automatically whenever an aggregate update event arrives from the DDA. This means your application always has access to the latest aggregate state with minimal latency.

If you need to force a fresh fetch from the DDA (bypassing the cache), you can call the underlying gRPC method directly, but this is rarely necessary.

MockDeviceAgentInterface

For testing, pydoover provides MockDeviceAgentInterface, which simulates the DDA in memory without any gRPC connection.

from pydoover.docker import MockDeviceAgentInterface

mock_dda = MockDeviceAgentInterface(app_key="test")

# Simulate creating a message
await mock_dda.create_message("telemetry", {"temp": 23.5})

# Fetch the aggregate (reflects the message data)
agg = await mock_dda.fetch_channel_aggregate("telemetry")
assert agg["temp"] == 23.5

The mock interface records all operations and maintains in-memory channel state. It is the interface used when an Application is created with test_mode=True. Use it for unit tests and integration tests that verify application behaviour without requiring a running DDA service.

Next Steps