Skip to content

Channels and Messaging

Channels are the primary mechanism for storing and transmitting state in the Doover platform. They support aggregated state, message history, and pub/sub patterns.

Channel Types

The Cloud API provides three channel types:

TypeName PrefixDescription
Channel(none)Base channel for state storage
Processor#Contains code for lambda-like function execution
Task!Triggers a processor when messages are published

Channel Class

The base Channel class represents a data channel in Doover.

Attributes

AttributeTypeDescription
keystrThe unique identifier (UUID) for the channel
idstrAlias for key
namestrThe channel name
agent_idstrThe owner agent's unique identifier

Properties

PropertyTypeDescription
aggregateAnyThe current aggregate data for the channel
last_messageMessageThe most recent message in the channel
last_update_agefloatAge of the last message in seconds

Methods

MethodReturnsDescription
update()NoneRefreshes channel data from the server
fetch_agent()AgentFetches the owning agent
fetch_aggregate()dict|strFetches the aggregate data
fetch_messages(num)list[Message]Fetches recent messages
fetch_messages_in_window(start, end)list[Message]Fetches messages in a time range
publish(data, ...)NonePublishes data to the channel
update_from_file(path, mime)NonePublishes file contents
get_tunnel_url(address)strGets tunnel URL (tunnels channel only)

Creating Channels

Regular Channels

from pydoover.cloud.api import Client

client = Client()

# Create a new channel (or get existing)
channel = client.create_channel("sensor-data", agent_id="agent-uuid")
print(f"Channel ID: {channel.key}")

Processor Channels

Processor channels contain code that runs when triggered by a task:

# Create a processor channel
processor = client.create_processor("data-processor", agent_id="agent-uuid")
# or
processor = client.create_channel("#data-processor", agent_id="agent-uuid")

# Update processor with code package
processor.update_from_package("/path/to/processor/package")

Task Channels

Task channels trigger processors when they receive messages:

# Create a task that triggers the processor
task = client.create_task(
    task_name="process-data",
    agent_id="agent-uuid",
    processor_id=processor.key
)

# Subscribe the task to a data channel
task.subscribe_to_channel(data_channel.key)

Retrieving Channels

By ID

channel = client.get_channel("d1c7e8e3-f47b-4c68-86d7-65054d9e97d3")

By Name

channel = client.get_channel_named("sensor-data", agent_id="agent-uuid")

Publishing Data

Using Channel Object

channel = client.get_channel("channel-uuid")

# Publish JSON data
channel.publish({
    "temperature": 25.5,
    "humidity": 60,
    "status": "online"
})

# Publish with options
channel.publish(
    data={"value": 42},
    save_log=True,           # Save to message history
    log_aggregate=False,     # Don't log aggregate changes
    override_aggregate=True  # Replace entire aggregate
)

Using Client Methods

# Publish by channel ID
client.publish_to_channel(
    channel_id="channel-uuid",
    data={"key": "value"}
)

# Publish by channel name
client.publish_to_channel_name(
    agent_id="agent-uuid",
    channel_name="sensor-data",
    data={"key": "value"}
)

Publishing Options

OptionTypeDefaultDescription
save_logboolTrueSave message to channel history
log_aggregateboolFalseLog aggregate data after publish
override_aggregateboolFalseReplace aggregate instead of merging
timestampdatetimeNoneCustom timestamp for the message
from datetime import datetime, timezone

# Publish with custom timestamp
channel.publish(
    data={"reading": 42},
    timestamp=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
)

# Publish without saving to log (aggregate only)
channel.publish(
    data={"status": "active"},
    save_log=False
)

Working with Aggregates

The aggregate represents the current state of a channel, merged from all published messages:

channel = client.get_channel("channel-uuid")

# Access cached aggregate
print(channel.aggregate)

# Fetch fresh aggregate from server
fresh_aggregate = channel.fetch_aggregate()
print(fresh_aggregate)

Override vs Merge

By default, published data merges with the existing aggregate:

# Current aggregate: {"a": 1, "b": 2}
channel.publish({"b": 3, "c": 4})
# New aggregate: {"a": 1, "b": 3, "c": 4}

Use override_aggregate=True to replace entirely:

# Current aggregate: {"a": 1, "b": 2}
channel.publish({"x": 10}, override_aggregate=True)
# New aggregate: {"x": 10}

Message Class

The Message class represents a message in a channel.

Attributes

AttributeTypeDescription
idstrUnique message identifier
channel_idstrChannel the message belongs to
agent_idstrAgent that sent the message
channel_namestrName of the channel

Properties

PropertyTypeDescription
timestampdatetimeMessage creation time (UTC)
agefloatAge of the message in seconds

Methods

MethodReturnsDescription
update()NoneRefreshes message data from server
delete()NoneDeletes the message
fetch_payload()dict|strFetches the message payload
to_dict()dictConverts message to dictionary
get_age()floatReturns message age in seconds
get_timestamp()datetimeReturns message timestamp

Retrieving Messages

Recent Messages

# Get last 10 messages
messages = client.get_channel_messages("channel-uuid", num_messages=10)

for msg in messages:
    print(f"[{msg.timestamp}] {msg.fetch_payload()}")

Messages in Time Window

from datetime import datetime, timedelta

end = datetime.now()
start = end - timedelta(hours=24)

messages = client.get_channel_messages_in_window("channel-uuid", start, end)
print(f"Found {len(messages)} messages in the last 24 hours")

Single Message

message = client.get_message("channel-uuid", "message-uuid")
print(message.fetch_payload())

Using Channel Object

channel = client.get_channel("channel-uuid")

# Fetch recent messages
messages = channel.fetch_messages(num_messages=20)

# Fetch messages in time window
from datetime import datetime, timedelta
messages = channel.fetch_messages_in_window(
    window_start=datetime.now() - timedelta(hours=1),
    window_end=datetime.now()
)

Message Operations

Accessing Payload

message = messages[0]

# Fetch payload (loads from server if not cached)
payload = message.fetch_payload()
print(payload)

Deleting Messages

message = client.get_message("channel-uuid", "message-uuid")
message.delete()

Message from CSV Export

# Import messages from a CSV export file
messages = Message.from_csv_export(client, "/path/to/export.csv")

for msg in messages:
    print(f"{msg.channel_name}: {msg.fetch_payload()}")

Processor Class

The Processor class extends Channel for processor-specific operations.

Methods

MethodDescription
update_from_package(path)Upload a package directory as processor code
invoke_locally(...)Execute processor code locally for testing

Updating Processor Code

processor = client.get_channel("#my-processor")

# Upload new processor code
processor.update_from_package("/path/to/processor/src")

The package directory is zipped and uploaded as base64-encoded data.

Local Invocation

For testing, you can invoke processor code locally:

processor.invoke_locally(
    package_dir="/path/to/processor/src",
    agent_id="agent-uuid",
    access_token=client.access_token.token,
    api_endpoint="https://my.doover.dev",
    package_config={"setting": "value"},
    msg_obj={"trigger": "data"},
    agent_settings={"deployment_config": {}}
)

Task Class

The Task class extends Channel for task-specific operations.

Attributes

AttributeTypeDescription
processor_keystrKey of the associated processor
processor_idstrAlias for processor_key

Methods

MethodReturnsDescription
fetch_processor()ProcessorFetches the associated processor
subscribe_to_channel(channel_id)boolSubscribes task to a channel
unsubscribe_from_channel(channel_id)boolUnsubscribes task from a channel
invoke_locally(...)NoneExecutes task locally for testing

Task Subscriptions

Tasks can subscribe to channels to trigger when those channels receive messages:

task = client.get_channel("!my-task")

# Subscribe to a data channel
task.subscribe_to_channel("data-channel-uuid")

# Unsubscribe
task.unsubscribe_from_channel("data-channel-uuid")

Using Client Methods

client.subscribe_to_channel(
    channel_id="data-channel-uuid",
    task_id="task-uuid"
)

client.unsubscribe_from_channel(
    channel_id="data-channel-uuid",
    task_id="task-uuid"
)

Publishing Files

Channels can store file data as base64-encoded content:

channel = client.get_channel("channel-uuid")

# Upload a file
channel.update_from_file("/path/to/image.png")

# Specify MIME type explicitly
channel.update_from_file("/path/to/data.bin", mime_type="application/octet-stream")

The file is stored in the aggregate as:

{
    "output_type": "image/png",
    "output": "base64-encoded-data..."
}

Tunnel URLs

The special tunnels channel provides remote access URLs:

tunnels_channel = client.get_channel_named("tunnels", agent_id="agent-uuid")

# Get URL for a specific address
url = tunnels_channel.get_tunnel_url("192.168.1.100:8080")
if url:
    print(f"Tunnel URL: {url}")

Complete Example

from pydoover.cloud.api import Client
from datetime import datetime, timedelta

client = Client()
agent_id = "your-agent-uuid"

# Create a sensor data channel
sensor_channel = client.create_channel("temperature-readings", agent_id)

# Publish some readings
for i in range(5):
    sensor_channel.publish({
        "reading_id": i,
        "temperature": 20 + i * 0.5,
        "unit": "celsius"
    })

# Create a processor for data analysis
processor = client.create_processor("analyze-temperature", agent_id)

# Create a task that triggers the processor
task = client.create_task("temperature-analyzer", agent_id, processor.key)

# Subscribe task to the sensor channel
task.subscribe_to_channel(sensor_channel.key)

# Fetch recent messages
messages = sensor_channel.fetch_messages(num_messages=10)
print(f"Last {len(messages)} readings:")
for msg in messages:
    payload = msg.fetch_payload()
    print(f"  {msg.timestamp}: {payload['temperature']}C")

# Get aggregate state
print(f"\nCurrent aggregate: {sensor_channel.aggregate}")

# Check last update time
age = sensor_channel.last_update_age
if age:
    print(f"Last update: {age:.1f} seconds ago")

Error Handling

from pydoover.cloud.api import Client, NotFound, Forbidden

client = Client()

try:
    channel = client.get_channel("invalid-uuid")
except NotFound:
    print("Channel not found")
except Forbidden:
    print("Access denied")

try:
    channel = client.get_channel_named("nonexistent", "agent-uuid")
except NotFound:
    # Channel doesn't exist, create it
    channel = client.create_channel("nonexistent", "agent-uuid")