Skip to content

Messages

Messages are the append-only data stream within a channel. Each message has a snowflake ID that encodes its creation timestamp, a JSON data payload, and optional file attachments. The DataClient provides methods for listing, creating, updating, deleting, and iterating through messages.

Listing Messages

Retrieve messages from a channel. Results can be filtered by time range and limited in count:

from pydoover.api import DataClient
from datetime import datetime, timezone

client = DataClient(profile="default")

# List the 10 most recent messages
messages = client.list_messages(
    agent_id=12345,
    channel_name="telemetry",
    limit=10,
)

for msg in messages:
    print(f"[{msg.id}] {msg.data}")

Filtering by Time Range

The before and after parameters accept either snowflake IDs or datetime objects. When you pass a datetime, it is automatically converted to a snowflake:

# Fetch messages from the last 24 hours
from datetime import timedelta

now = datetime.now(timezone.utc)
yesterday = now - timedelta(days=1)

messages = client.list_messages(
    agent_id=12345,
    channel_name="telemetry",
    after=yesterday,
    before=now,
)

Filtering by Field

Use field_names to request only specific fields from each message's data payload. This reduces response size when messages contain many fields:

# Only return the "temperature" and "humidity" fields
messages = client.list_messages(
    agent_id=12345,
    channel_name="telemetry",
    field_names=["temperature", "humidity"],
)

Iterating Messages

For large result sets, use iter_messages to paginate through messages automatically. The iterator fetches pages of 50 messages (configurable via page_size) and yields them one at a time, using snowflake-based cursor pagination:

# Iterate through all messages in a time range
for msg in client.iter_messages(
    agent_id=12345,
    channel_name="telemetry",
    after=yesterday,
):
    print(msg.data["temperature"])

To load all matching messages into a list at once, use .collect():

# Collect all messages into a list
all_messages = client.iter_messages(
    agent_id=12345,
    channel_name="telemetry",
    after=yesterday,
    page_size=100,
).collect()

print(f"Total messages: {len(all_messages)}")

The async variant works with async for:

from pydoover.api import AsyncDataClient

async with AsyncDataClient(profile="default") as client:
    async for msg in client.iter_messages(12345, "telemetry"):
        print(msg.data)

    # Or collect all into a list
    all_msgs = await client.iter_messages(12345, "telemetry").collect()

Fetching a Single Message

Retrieve a specific message by its snowflake ID:

message = client.fetch_message(
    agent_id=12345,
    channel_name="telemetry",
    message_id=7654321,
)

print(message.data)

Creating Messages

Post a new message to a channel. The data parameter is a dictionary that becomes the message payload:

# Create a simple message
message = client.create_message(
    agent_id=12345,
    channel_name="telemetry",
    data={"temperature": 22.5, "humidity": 65, "pressure": 1013.25},
)

print(f"Created message: {message.id}")

With File Attachments

Messages can include file attachments using File objects:

from pydoover.models.data import File

# Read a file and attach it to the message
with open("sensor_log.csv", "rb") as f:
    file_data = f.read()

message = client.create_message(
    agent_id=12345,
    channel_name="reports",
    data={"report_type": "daily_summary"},
    files=[
        File(
            filename="sensor_log.csv",
            data=file_data,
            content_type="text/csv",
        )
    ],
)

With a Custom Timestamp

By default, the server assigns a timestamp. You can provide your own as a Unix timestamp in milliseconds:

import time

message = client.create_message(
    agent_id=12345,
    channel_name="telemetry",
    data={"temperature": 22.5},
    timestamp=int(time.time() * 1000),
)

Updating Messages

Update an existing message. By default, the update is a merge (PATCH) that only modifies the specified keys:

# Merge new fields into the message data
updated = client.update_message(
    agent_id=12345,
    channel_name="telemetry",
    message_id=7654321,
    data={"humidity": 70},
)

To replace the data entirely, set replace_data=True:

# Replace the entire message data
updated = client.update_message(
    agent_id=12345,
    channel_name="telemetry",
    message_id=7654321,
    data={"temperature": 23.0, "humidity": 70},
    replace_data=True,
)

Deleting Messages

Remove a message by its snowflake ID:

client.delete_message(
    agent_id=12345,
    channel_name="telemetry",
    message_id=7654321,
)

Fetching Timeseries Data

The fetch_timeseries method retrieves optimised timeseries data for specific fields across a channel's message history. This is more efficient than listing individual messages when you need columnar data for charting or analysis:

# Fetch timeseries data for temperature and humidity
ts = client.fetch_timeseries(
    agent_id=12345,
    channel_name="telemetry",
    field_names=["temperature", "humidity"],
    after=yesterday,
    before=now,
    limit=1000,
)

# TimeseriesResponse contains the structured timeseries data
print(ts)

Downloading Attachments

If a message has file attachments, download them using fetch_message_attachment:

# Fetch a message with attachments
message = client.fetch_message(
    agent_id=12345,
    channel_name="reports",
    message_id=7654321,
)

# Download each attachment
for attachment in message.attachments:
    content = client.fetch_message_attachment(attachment)
    with open(attachment.filename, "wb") as f:
        f.write(content)

Multi-Agent Batch Operations

When you need to query the same channel across multiple agents at once, the multi-agent methods are more efficient than making individual requests.

Fetching Messages Across Agents

# Fetch messages from the "telemetry" channel across three agents
batch = client.fetch_multi_agent_messages(
    channel_name="telemetry",
    agent_ids=[100, 200, 300],
    after=yesterday,
    limit=50,
)

# BatchMessageResponse contains results grouped by agent
for msg in batch.results:
    print(f"Agent {msg.agent_id}: {msg.data}")

Iterating Multi-Agent Messages

The iter_multi_agent_messages method handles pagination automatically. It also splits time ranges into 2-day windows internally, because the multi-agent endpoint requires before and after to be at most 2 days apart:

# Iterate through multi-agent messages over a wide time range
for msg in client.iter_multi_agent_messages(
    channel_name="telemetry",
    agent_ids=[100, 200, 300],
    after=datetime(2026, 1, 1, tzinfo=timezone.utc),
    before=datetime(2026, 5, 1, tzinfo=timezone.utc),
):
    print(msg.data)

Fetching Aggregates Across Agents

# Get the current aggregate for multiple agents
batch_agg = client.fetch_multi_agent_aggregates(
    channel_name="device_status",
    agent_ids=[100, 200, 300],
)

Method Reference

MethodReturnsDescription
list_messages(...)list[Message]List messages with optional filters
iter_messages(...)MessageIteratorPaginating iterator over messages
fetch_message(...)MessageFetch a single message by ID
create_message(...)MessageCreate a new message (with optional files)
update_message(...)Message | NoneUpdate a message (merge or replace)
delete_message(...)--Delete a message by ID
fetch_timeseries(...)TimeseriesResponseFetch columnar timeseries data
fetch_message_attachment(...)bytesDownload an attachment's content
fetch_multi_agent_messages(...)BatchMessageResponseMessages across multiple agents
iter_multi_agent_messages(...)MultiAgentMessageIteratorPaginating multi-agent iterator
fetch_multi_agent_aggregates(...)BatchAggregateResponseAggregates across multiple agents

Related Pages

  • Channels -- channel management
  • Data Client -- DataClient constructor and configuration
  • Alarms -- alarm rules that trigger on message data