DeviceAgentInterface
The DeviceAgentInterface provides communication with the Doover Device Agent (DDA), enabling your application to interact with the Doover cloud, subscribe to channels, and publish data.
Import
from pydoover.docker import DeviceAgentInterface
Overview
The Device Agent Interface uses gRPC to communicate with the Device Agent container, which maintains a persistent connection to the Doover cloud. This allows applications to:
- Subscribe to cloud channels and receive real-time updates
- Publish data to channels
- Sync state between devices and the cloud
- Get temporary API tokens for direct cloud access
Class Definition
class DeviceAgentInterface:
def __init__(
self,
app_key: str,
dda_uri: str = "127.0.0.1:50051",
is_async: bool = None,
dda_timeout: int = 7,
max_conn_attempts: int = 5,
time_between_connection_attempts: int = 10,
)
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
app_key | str | Required | Unique application identifier |
dda_uri | str | "127.0.0.1:50051" | URI for the Device Agent gRPC service |
is_async | bool | None | Force async mode |
dda_timeout | int | 7 | Timeout for requests in seconds |
max_conn_attempts | int | 5 | Maximum connection retry attempts |
time_between_connection_attempts | int | 10 | Seconds between retry attempts |
Attributes
| Attribute | Type | Description |
|---|---|---|
is_dda_available | bool | Whether DDA is available |
is_dda_online | bool | Whether DDA is currently connected to cloud |
has_dda_been_online | bool | Whether DDA has connected at least once |
agent_id | str | The agent ID from deployment config |
last_channel_message_ts | dict | Timestamp of last message per channel |
Status Methods
get_is_dda_available()
Check if the Device Agent service is available.
def get_is_dda_available(self) -> bool
Example:
if self.device_agent.get_is_dda_available():
print("Device Agent is available")
get_is_dda_online()
Check if the Device Agent is currently connected to the cloud.
def get_is_dda_online(self) -> bool
Example:
if self.device_agent.get_is_dda_online():
print("Connected to cloud")
else:
print("Offline - data will be queued")
get_has_dda_been_online()
Check if the Device Agent has been online at least once since startup.
def get_has_dda_been_online(self) -> bool
await_dda_available()
Wait for the Device Agent to become available.
def await_dda_available(self, timeout: int = 10) -> bool
Example:
# Wait up to 30 seconds for DDA
if self.device_agent.await_dda_available(timeout=30):
print("DDA is ready")
else:
print("DDA did not become available")
await_dda_available_async()
Async version of await_dda_available.
await self.device_agent.await_dda_available_async(timeout=30)
test_comms()
Test connection to the Device Agent by sending an echo message.
def test_comms(self, message: str = "Comms Check Message") -> str | None
Example:
response = self.device_agent.test_comms("Hello DDA")
if response:
print(f"DDA responded: {response}")
Channel Subscription
add_subscription()
Subscribe to a channel to receive updates. The callback is invoked whenever new data arrives on the channel.
def add_subscription(
self,
channel_name: str,
callback: Callable[[str, dict | str], Any]
) -> None
Parameters:
channel_name: Name of the channel to subscribe tocallback: Function called with (channel_name, aggregate_data) when updates arrive
Example:
async def on_config_update(channel_name, data):
print(f"Received update on {channel_name}")
print(f"Data: {data}")
# Subscribe to deployment config updates
self.device_agent.add_subscription("deployment_config", on_config_update)
# Subscribe to custom channel
self.device_agent.add_subscription("sensor_commands", on_sensor_command)
The callback can be a regular function or an async function:
# Sync callback
def handle_update(channel_name, data):
process_data(data)
# Async callback
async def handle_update_async(channel_name, data):
await process_data_async(data)
is_channel_synced()
Check if a channel has received its initial sync from the Device Agent.
def is_channel_synced(self, channel_name: str) -> bool
Example:
if self.device_agent.is_channel_synced("deployment_config"):
print("Config is synced")
wait_for_channels_sync_async()
Wait for multiple channels to be synced.
async def wait_for_channels_sync_async(
self,
channel_names: list[str],
timeout: int = 5,
inter_wait: float = 0.2
) -> bool
Example:
synced = await self.device_agent.wait_for_channels_sync_async(
["deployment_config", "tag_values"],
timeout=10
)
if synced:
print("All channels synced")
Channel Publishing
publish_to_channel()
Publish data to a channel. This method works in both sync and async contexts.
def publish_to_channel(
self,
channel_name: str,
message: dict | str,
record_log: bool = True,
max_age: int = None
) -> bool
Parameters:
channel_name: Name of the channel to publish tomessage: Data to publish (dict or string)record_log: Whether to save to the log (default: True)max_age: Maximum seconds before publishing to cloud. Use-1to publish immediately
Returns: True if publishing was successful, False otherwise
Example:
# Publish sensor data
result = self.device_agent.publish_to_channel(
"sensor_data",
{"temperature": 25.5, "humidity": 60}
)
# Publish immediately (bypass bundling)
self.device_agent.publish_to_channel(
"alerts",
{"type": "critical", "message": "High temperature"},
max_age=-1
)
# Don't record to log
self.device_agent.publish_to_channel(
"telemetry",
{"voltage": 12.3},
record_log=False
)
publish_to_channel_async()
Async version of publish_to_channel.
await self.device_agent.publish_to_channel_async(
"sensor_data",
{"temperature": 25.5}
)
Channel Data Retrieval
get_channel_aggregate()
Get the current aggregate data for a channel.
def get_channel_aggregate(self, channel_name: str) -> dict | str | None
Example:
config = self.device_agent.get_channel_aggregate("deployment_config")
if config:
print(f"Current config: {config}")
get_channel_aggregate_async()
Async version of get_channel_aggregate.
config = await self.device_agent.get_channel_aggregate_async("deployment_config")
Token Management
get_temp_token()
Get a temporary API token for direct cloud access.
def get_temp_token(self) -> tuple[str, datetime, str] | None
Returns: Tuple of (token, expire_time, endpoint) or None if failed
Example:
result = self.device_agent.get_temp_token()
if result:
token, expires, endpoint = result
print(f"Token: {token}")
print(f"Expires: {expires}")
print(f"Endpoint: {endpoint}")
get_temp_token_async()
Async version of get_temp_token.
result = await self.device_agent.get_temp_token_async()
Channel Listening (Debug)
listen_channel()
Listen to a channel and print updates to the console. Useful for debugging.
def listen_channel(self, channel_name: str) -> None
Example:
# This will continuously print updates to stdout
self.device_agent.listen_channel("sensor_data")
Common Channels
The Doover platform uses several standard channels:
| Channel Name | Description |
|---|---|
deployment_config | Application deployment configuration |
tag_values | Tag values shared between applications |
ui_state | User interface state |
cmds | Commands from the cloud |
Usage Example
Here is a complete example showing common DeviceAgentInterface operations:
from pydoover.docker import Application, run_app
from pydoover.config import Schema
class CloudApp(Application):
def setup(self):
# Check DDA status
if not self.device_agent.get_is_dda_available():
print("Warning: DDA not available yet")
# Subscribe to commands
self.device_agent.add_subscription("commands", self.on_command)
# Subscribe to config updates
self.device_agent.add_subscription(
"deployment_config",
self.on_config_update
)
async def on_command(self, channel_name, data):
"""Handle incoming commands from the cloud."""
command = data.get("command")
if command == "restart":
print("Restart requested")
elif command == "update_config":
print(f"Config update: {data.get('config')}")
async def on_config_update(self, channel_name, data):
"""Handle configuration updates."""
print(f"Configuration updated: {data}")
def main_loop(self):
# Publish sensor data
self.device_agent.publish_to_channel("sensor_data", {
"temperature": self.read_temperature(),
"humidity": self.read_humidity(),
"timestamp": time.time()
})
# Check cloud status
if self.device_agent.get_is_dda_online():
print("Cloud connected")
else:
print("Cloud disconnected - data queued")
def read_temperature(self):
return 25.5
def read_humidity(self):
return 60.0
if __name__ == "__main__":
run_app(CloudApp(config=Schema()))
MockDeviceAgentInterface
For testing, pydoover provides a mock implementation:
from pydoover.docker import MockDeviceAgentInterface
# Create mock interface
mock_dda = MockDeviceAgentInterface("test-app")
# Set channel data for testing
mock_dda.channels["deployment_config"] = {"key": "value"}
# Use in application
app = MyApp(
config=MyConfig(),
device_agent=mock_dda,
test_mode=True
)
The mock interface:
- Always reports as available and online
- Stores published data in
channelsdict - Does not require a real Device Agent service