Channels and Messaging
Channels are the primary mechanism for storing and transmitting state in the Doover platform. They support aggregated state, message history, and pub/sub patterns.
Channel Types
The Cloud API provides three channel types:
| Type | Name Prefix | Description |
|---|---|---|
| Channel | (none) | Base channel for state storage |
| Processor | # | Contains code for lambda-like function execution |
| Task | ! | Triggers a processor when messages are published |
Channel Class
The base Channel class represents a data channel in Doover.
Attributes
| Attribute | Type | Description |
|---|---|---|
| key | str | The unique identifier (UUID) for the channel |
| id | str | Alias for key |
| name | str | The channel name |
| agent_id | str | The owner agent's unique identifier |
Properties
| Property | Type | Description |
|---|---|---|
| aggregate | Any | The current aggregate data for the channel |
| last_message | Message | The most recent message in the channel |
| last_update_age | float | Age of the last message in seconds |
Methods
| Method | Returns | Description |
|---|---|---|
| update() | None | Refreshes channel data from the server |
| fetch_agent() | Agent | Fetches the owning agent |
| fetch_aggregate() | dict|str | Fetches the aggregate data |
| fetch_messages(num) | list[Message] | Fetches recent messages |
| fetch_messages_in_window(start, end) | list[Message] | Fetches messages in a time range |
| publish(data, ...) | None | Publishes data to the channel |
| update_from_file(path, mime) | None | Publishes file contents |
| get_tunnel_url(address) | str | Gets tunnel URL (tunnels channel only) |
Creating Channels
Regular Channels
from pydoover.cloud.api import Client
client = Client()
# Create a new channel (or get existing)
channel = client.create_channel("sensor-data", agent_id="agent-uuid")
print(f"Channel ID: {channel.key}")
Processor Channels
Processor channels contain code that runs when triggered by a task:
# Create a processor channel
processor = client.create_processor("data-processor", agent_id="agent-uuid")
# or
processor = client.create_channel("#data-processor", agent_id="agent-uuid")
# Update processor with code package
processor.update_from_package("/path/to/processor/package")
Task Channels
Task channels trigger processors when they receive messages:
# Create a task that triggers the processor
task = client.create_task(
task_name="process-data",
agent_id="agent-uuid",
processor_id=processor.key
)
# Subscribe the task to a data channel
task.subscribe_to_channel(data_channel.key)
Retrieving Channels
By ID
channel = client.get_channel("d1c7e8e3-f47b-4c68-86d7-65054d9e97d3")
By Name
channel = client.get_channel_named("sensor-data", agent_id="agent-uuid")
Publishing Data
Using Channel Object
channel = client.get_channel("channel-uuid")
# Publish JSON data
channel.publish({
"temperature": 25.5,
"humidity": 60,
"status": "online"
})
# Publish with options
channel.publish(
data={"value": 42},
save_log=True, # Save to message history
log_aggregate=False, # Don't log aggregate changes
override_aggregate=True # Replace entire aggregate
)
Using Client Methods
# Publish by channel ID
client.publish_to_channel(
channel_id="channel-uuid",
data={"key": "value"}
)
# Publish by channel name
client.publish_to_channel_name(
agent_id="agent-uuid",
channel_name="sensor-data",
data={"key": "value"}
)
Publishing Options
| Option | Type | Default | Description |
|---|---|---|---|
| save_log | bool | True | Save message to channel history |
| log_aggregate | bool | False | Log aggregate data after publish |
| override_aggregate | bool | False | Replace aggregate instead of merging |
| timestamp | datetime | None | Custom timestamp for the message |
from datetime import datetime, timezone
# Publish with custom timestamp
channel.publish(
data={"reading": 42},
timestamp=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
)
# Publish without saving to log (aggregate only)
channel.publish(
data={"status": "active"},
save_log=False
)
Working with Aggregates
The aggregate represents the current state of a channel, merged from all published messages:
channel = client.get_channel("channel-uuid")
# Access cached aggregate
print(channel.aggregate)
# Fetch fresh aggregate from server
fresh_aggregate = channel.fetch_aggregate()
print(fresh_aggregate)
Override vs Merge
By default, published data merges with the existing aggregate:
# Current aggregate: {"a": 1, "b": 2}
channel.publish({"b": 3, "c": 4})
# New aggregate: {"a": 1, "b": 3, "c": 4}
Use override_aggregate=True to replace entirely:
# Current aggregate: {"a": 1, "b": 2}
channel.publish({"x": 10}, override_aggregate=True)
# New aggregate: {"x": 10}
Message Class
The Message class represents a message in a channel.
Attributes
| Attribute | Type | Description |
|---|---|---|
| id | str | Unique message identifier |
| channel_id | str | Channel the message belongs to |
| agent_id | str | Agent that sent the message |
| channel_name | str | Name of the channel |
Properties
| Property | Type | Description |
|---|---|---|
| timestamp | datetime | Message creation time (UTC) |
| age | float | Age of the message in seconds |
Methods
| Method | Returns | Description |
|---|---|---|
| update() | None | Refreshes message data from server |
| delete() | None | Deletes the message |
| fetch_payload() | dict|str | Fetches the message payload |
| to_dict() | dict | Converts message to dictionary |
| get_age() | float | Returns message age in seconds |
| get_timestamp() | datetime | Returns message timestamp |
Retrieving Messages
Recent Messages
# Get last 10 messages
messages = client.get_channel_messages("channel-uuid", num_messages=10)
for msg in messages:
print(f"[{msg.timestamp}] {msg.fetch_payload()}")
Messages in Time Window
from datetime import datetime, timedelta
end = datetime.now()
start = end - timedelta(hours=24)
messages = client.get_channel_messages_in_window("channel-uuid", start, end)
print(f"Found {len(messages)} messages in the last 24 hours")
Single Message
message = client.get_message("channel-uuid", "message-uuid")
print(message.fetch_payload())
Using Channel Object
channel = client.get_channel("channel-uuid")
# Fetch recent messages
messages = channel.fetch_messages(num_messages=20)
# Fetch messages in time window
from datetime import datetime, timedelta
messages = channel.fetch_messages_in_window(
window_start=datetime.now() - timedelta(hours=1),
window_end=datetime.now()
)
Message Operations
Accessing Payload
message = messages[0] # Fetch payload (loads from server if not cached) payload = message.fetch_payload() print(payload)
Deleting Messages
message = client.get_message("channel-uuid", "message-uuid")
message.delete()
Message from CSV Export
# Import messages from a CSV export file
messages = Message.from_csv_export(client, "/path/to/export.csv")
for msg in messages:
print(f"{msg.channel_name}: {msg.fetch_payload()}")
Processor Class
The Processor class extends Channel for processor-specific operations.
Methods
| Method | Description |
|---|---|
| update_from_package(path) | Upload a package directory as processor code |
| invoke_locally(...) | Execute processor code locally for testing |
Updating Processor Code
processor = client.get_channel("#my-processor")
# Upload new processor code
processor.update_from_package("/path/to/processor/src")
The package directory is zipped and uploaded as base64-encoded data.
Local Invocation
For testing, you can invoke processor code locally:
processor.invoke_locally(
package_dir="/path/to/processor/src",
agent_id="agent-uuid",
access_token=client.access_token.token,
api_endpoint="https://my.doover.dev",
package_config={"setting": "value"},
msg_obj={"trigger": "data"},
agent_settings={"deployment_config": {}}
)
Task Class
The Task class extends Channel for task-specific operations.
Attributes
| Attribute | Type | Description |
|---|---|---|
| processor_key | str | Key of the associated processor |
| processor_id | str | Alias for processor_key |
Methods
| Method | Returns | Description |
|---|---|---|
| fetch_processor() | Processor | Fetches the associated processor |
| subscribe_to_channel(channel_id) | bool | Subscribes task to a channel |
| unsubscribe_from_channel(channel_id) | bool | Unsubscribes task from a channel |
| invoke_locally(...) | None | Executes task locally for testing |
Task Subscriptions
Tasks can subscribe to channels to trigger when those channels receive messages:
task = client.get_channel("!my-task")
# Subscribe to a data channel
task.subscribe_to_channel("data-channel-uuid")
# Unsubscribe
task.unsubscribe_from_channel("data-channel-uuid")
Using Client Methods
client.subscribe_to_channel(
channel_id="data-channel-uuid",
task_id="task-uuid"
)
client.unsubscribe_from_channel(
channel_id="data-channel-uuid",
task_id="task-uuid"
)
Publishing Files
Channels can store file data as base64-encoded content:
channel = client.get_channel("channel-uuid")
# Upload a file
channel.update_from_file("/path/to/image.png")
# Specify MIME type explicitly
channel.update_from_file("/path/to/data.bin", mime_type="application/octet-stream")
The file is stored in the aggregate as:
{
"output_type": "image/png",
"output": "base64-encoded-data..."
}
Tunnel URLs
The special tunnels channel provides remote access URLs:
tunnels_channel = client.get_channel_named("tunnels", agent_id="agent-uuid")
# Get URL for a specific address
url = tunnels_channel.get_tunnel_url("192.168.1.100:8080")
if url:
print(f"Tunnel URL: {url}")
Complete Example
from pydoover.cloud.api import Client
from datetime import datetime, timedelta
client = Client()
agent_id = "your-agent-uuid"
# Create a sensor data channel
sensor_channel = client.create_channel("temperature-readings", agent_id)
# Publish some readings
for i in range(5):
sensor_channel.publish({
"reading_id": i,
"temperature": 20 + i * 0.5,
"unit": "celsius"
})
# Create a processor for data analysis
processor = client.create_processor("analyze-temperature", agent_id)
# Create a task that triggers the processor
task = client.create_task("temperature-analyzer", agent_id, processor.key)
# Subscribe task to the sensor channel
task.subscribe_to_channel(sensor_channel.key)
# Fetch recent messages
messages = sensor_channel.fetch_messages(num_messages=10)
print(f"Last {len(messages)} readings:")
for msg in messages:
payload = msg.fetch_payload()
print(f" {msg.timestamp}: {payload['temperature']}C")
# Get aggregate state
print(f"\nCurrent aggregate: {sensor_channel.aggregate}")
# Check last update time
age = sensor_channel.last_update_age
if age:
print(f"Last update: {age:.1f} seconds ago")
Error Handling
from pydoover.cloud.api import Client, NotFound, Forbidden
client = Client()
try:
channel = client.get_channel("invalid-uuid")
except NotFound:
print("Channel not found")
except Forbidden:
print("Access denied")
try:
channel = client.get_channel_named("nonexistent", "agent-uuid")
except NotFound:
# Channel doesn't exist, create it
channel = client.create_channel("nonexistent", "agent-uuid")