Device Agent Interface
The DeviceAgentInterface is the primary communication bridge between a Docker application and the Doover cloud. It connects to the Device Agent (DDA) service running on the Doovit via gRPC and provides methods for channel operations, event subscriptions, message handling, and health monitoring.
In most applications, you do not create this interface directly. The run_app() entry point constructs it and passes it to your Application subclass. However, understanding its API is important for advanced use cases and testing.
Import
from pydoover.docker import DeviceAgentInterface
Constructor
dda = DeviceAgentInterface(
app_key="my_app",
dda_uri="127.0.0.1:50051",
dda_timeout=7,
max_conn_attempts=5,
time_between_connection_attempts=10,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
app_key | str | (required) | Identifies this application for tag scoping and event routing. |
dda_uri | str | "127.0.0.1:50051" | gRPC address of the Device Agent service. |
dda_timeout | int | 7 | Timeout in seconds for individual gRPC calls. |
max_conn_attempts | int | 5 | Maximum number of connection attempts before giving up. |
time_between_connection_attempts | int | 10 | Seconds to wait between connection retry attempts. |
The default dda_uri of 127.0.0.1:50051 assumes the Device Agent runs as a sidecar container on the same host network. This is the standard configuration on Doovit devices.
Status Properties
These properties reflect the current connection state of the Device Agent.
| Property | Type | Description |
|---|---|---|
is_dda_available | bool | True if the DDA service is reachable via gRPC. |
is_dda_online | bool | True if the DDA has an active connection to the Doover cloud. |
has_dda_been_online | bool | True if the DDA has successfully connected at least once since the application started. |
agent_id | str | The UUID of the agent this application is running on. |
if not self.device_agent.is_dda_online:
# Adjust behaviour during connectivity loss
self.set_tag("cloud_status", "offline")
This checks whether the Device Agent currently has a cloud connection. The application can continue operating offline; tags and messages are queued locally until the connection is restored.
Health Checking
wait_until_healthy(timeout)
Blocks until the Device Agent service is running and responsive. This is called automatically during application startup, but you can call it manually if needed.
await self.device_agent.wait_until_healthy(timeout=300)
| Parameter | Type | Default | Description |
|---|---|---|---|
timeout | int | 300 | Maximum seconds to wait before raising a timeout error. |
The method polls the DDA's health endpoint at regular intervals. If the DDA does not become healthy within the timeout, the call raises an error.
health_check()
Performs a single health check against the DDA service and returns immediately.
is_healthy = await self.device_agent.health_check()
Returns True if the DDA responded to the health check, False otherwise.
Event Subscriptions
Event subscriptions let your application react to data changes on specific channels. When a subscribed event occurs, the DDA delivers it to your registered callback.
add_event_callback(channel_name, callback, events)
Register a callback function for events on a specific channel.
async def handle_command(event):
command = event.data.get("action")
if command == "restart":
self.request_shutdown()
self.device_agent.add_event_callback(
channel_name="commands",
callback=handle_command,
events=["message_create"],
)
| Parameter | Type | Description |
|---|---|---|
channel_name | str | The name of the channel to subscribe to. |
callback | callable | Async function to call when an event occurs. |
events | list[str] | List of event types to listen for. |
Available event types:
| Event Type | Description |
|---|---|
"message_create" | A new message was published to the channel. |
"message_update" | An existing message on the channel was modified. |
"oneshot_message" | A transient message was received (not persisted in history). |
"aggregate_update" | The channel's aggregate was updated. |
"channel_sync" | The channel's full state was synchronised from the cloud. |
In most cases, you should use self.subscribe() on the Application class rather than calling add_event_callback() directly. The Application.subscribe() method routes events to the appropriate handler methods (on_message_create, on_aggregate_update, etc.) and handles the wiring for you.
wait_for_channels_sync(channel_names, timeout, inter_wait)
Wait until the specified channels have been synchronised from the cloud. This is useful during startup when you need channel data to be available before proceeding.
await self.device_agent.wait_for_channels_sync(
channel_names=["config", "commands", "tag_values"],
timeout=60,
inter_wait=2,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
channel_names | list[str] | (required) | Channels to wait for. |
timeout | int | 60 | Maximum seconds to wait for all channels. |
inter_wait | int | 2 | Seconds between polling attempts. |
The method blocks until all named channels have received their initial sync from the cloud, or until the timeout expires. This ensures your application has up-to-date data before starting its main loop.
Channel Operations
fetch_channel_aggregate(channel_name)
Fetch the current aggregate (consolidated state) for a channel.
aggregate = await self.device_agent.fetch_channel_aggregate("telemetry")
current_temp = aggregate.get("temperature")
After subscribing to a channel, its aggregate is cached in memory and updated automatically as events arrive. The first call fetches from the DDA; subsequent calls return the cached value until an update event arrives.
fetch_message(channel_name, message_id)
Fetch a specific message by its snowflake ID.
msg = await self.device_agent.fetch_message("commands", message_id=123456789)
print(msg.data)
list_messages(channel_name)
List messages in a channel, ordered by snowflake ID (creation time).
messages = await self.device_agent.list_messages("telemetry")
for msg in messages:
print(f"{msg.id}: {msg.data}")
create_message(channel_name, data, files, timestamp)
Publish a new message to a channel. Returns the message's snowflake ID.
msg_id = await self.device_agent.create_message(
channel_name="telemetry",
data={"temperature": 23.5, "humidity": 61.2},
)
| Parameter | Type | Description |
|---|---|---|
channel_name | str | Target channel name. |
data | dict | JSON-serialisable payload. |
files | dict or None | Optional dictionary of {filename: bytes} for attachments. |
timestamp | datetime or None | Optional timestamp override (uses current time if not set). |
update_message(channel_name, message_id, data, files)
Update an existing message's data or attachments.
await self.device_agent.update_message(
channel_name="telemetry",
message_id=msg_id,
data={"temperature": 24.0},
)
send_oneshot_message(channel_name, data)
Send a transient message that is delivered to subscribers but not persisted in the channel's message history.
await self.device_agent.send_oneshot_message(
channel_name="heartbeat",
data={"timestamp": "2025-01-15T10:30:00Z"},
)
Oneshot messages are useful for ephemeral signals (heartbeats, acknowledgements) that do not need to be stored.
update_channel_aggregate(channel_name, data)
Directly update a channel's aggregate without creating a message.
await self.device_agent.update_channel_aggregate(
channel_name="status",
data={"state": "running", "uptime_seconds": 86400},
)
fetch_message_attachment(channel_name, message_id, filename)
Download a file attachment from a message.
file_bytes = await self.device_agent.fetch_message_attachment(
channel_name="firmware",
message_id=msg_id,
filename="update.bin",
)
Aggregate Caching
When you subscribe to a channel's aggregate_update events, the DeviceAgentInterface caches the channel's aggregate in memory. Subsequent calls to fetch_channel_aggregate() return the cached value instantly without a gRPC call.
The cache is updated automatically whenever an aggregate update event arrives from the DDA. This means your application always has access to the latest aggregate state with minimal latency.
If you need to force a fresh fetch from the DDA (bypassing the cache), you can call the underlying gRPC method directly, but this is rarely necessary.
MockDeviceAgentInterface
For testing, pydoover provides MockDeviceAgentInterface, which simulates the DDA in memory without any gRPC connection.
from pydoover.docker import MockDeviceAgentInterface
mock_dda = MockDeviceAgentInterface(app_key="test")
# Simulate creating a message
await mock_dda.create_message("telemetry", {"temp": 23.5})
# Fetch the aggregate (reflects the message data)
agg = await mock_dda.fetch_channel_aggregate("telemetry")
assert agg["temp"] == 23.5
The mock interface records all operations and maintains in-memory channel state. It is the interface used when an Application is created with test_mode=True. Use it for unit tests and integration tests that verify application behaviour without requiring a running DDA service.
Next Steps
- Application Class -- the Application base class that wraps this interface
- Platform Interface -- hardware I/O operations
- Modbus Interface -- Modbus communication
- Channels -- cloud-side channel API documentation