Messages
Messages are the append-only data stream within a channel. Each message has a snowflake ID that encodes its creation timestamp, a JSON data payload, and optional file attachments. The DataClient provides methods for listing, creating, updating, deleting, and iterating through messages.
Listing Messages
Retrieve messages from a channel. Results can be filtered by time range and limited in count:
from pydoover.api import DataClient
from datetime import datetime, timezone
client = DataClient(profile="default")
# List the 10 most recent messages
messages = client.list_messages(
agent_id=12345,
channel_name="telemetry",
limit=10,
)
for msg in messages:
print(f"[{msg.id}] {msg.data}")
Filtering by Time Range
The before and after parameters accept either snowflake IDs or datetime objects. When you pass a datetime, it is automatically converted to a snowflake:
# Fetch messages from the last 24 hours
from datetime import timedelta
now = datetime.now(timezone.utc)
yesterday = now - timedelta(days=1)
messages = client.list_messages(
agent_id=12345,
channel_name="telemetry",
after=yesterday,
before=now,
)
Filtering by Field
Use field_names to request only specific fields from each message's data payload. This reduces response size when messages contain many fields:
# Only return the "temperature" and "humidity" fields
messages = client.list_messages(
agent_id=12345,
channel_name="telemetry",
field_names=["temperature", "humidity"],
)
Iterating Messages
For large result sets, use iter_messages to paginate through messages automatically. The iterator fetches pages of 50 messages (configurable via page_size) and yields them one at a time, using snowflake-based cursor pagination:
# Iterate through all messages in a time range
for msg in client.iter_messages(
agent_id=12345,
channel_name="telemetry",
after=yesterday,
):
print(msg.data["temperature"])
To load all matching messages into a list at once, use .collect():
# Collect all messages into a list
all_messages = client.iter_messages(
agent_id=12345,
channel_name="telemetry",
after=yesterday,
page_size=100,
).collect()
print(f"Total messages: {len(all_messages)}")
The async variant works with async for:
from pydoover.api import AsyncDataClient
async with AsyncDataClient(profile="default") as client:
async for msg in client.iter_messages(12345, "telemetry"):
print(msg.data)
# Or collect all into a list
all_msgs = await client.iter_messages(12345, "telemetry").collect()
Fetching a Single Message
Retrieve a specific message by its snowflake ID:
message = client.fetch_message(
agent_id=12345,
channel_name="telemetry",
message_id=7654321,
)
print(message.data)
Creating Messages
Post a new message to a channel. The data parameter is a dictionary that becomes the message payload:
# Create a simple message
message = client.create_message(
agent_id=12345,
channel_name="telemetry",
data={"temperature": 22.5, "humidity": 65, "pressure": 1013.25},
)
print(f"Created message: {message.id}")
With File Attachments
Messages can include file attachments using File objects:
from pydoover.models.data import File
# Read a file and attach it to the message
with open("sensor_log.csv", "rb") as f:
file_data = f.read()
message = client.create_message(
agent_id=12345,
channel_name="reports",
data={"report_type": "daily_summary"},
files=[
File(
filename="sensor_log.csv",
data=file_data,
content_type="text/csv",
)
],
)
With a Custom Timestamp
By default, the server assigns a timestamp. You can provide your own as a Unix timestamp in milliseconds:
import time
message = client.create_message(
agent_id=12345,
channel_name="telemetry",
data={"temperature": 22.5},
timestamp=int(time.time() * 1000),
)
Updating Messages
Update an existing message. By default, the update is a merge (PATCH) that only modifies the specified keys:
# Merge new fields into the message data
updated = client.update_message(
agent_id=12345,
channel_name="telemetry",
message_id=7654321,
data={"humidity": 70},
)
To replace the data entirely, set replace_data=True:
# Replace the entire message data
updated = client.update_message(
agent_id=12345,
channel_name="telemetry",
message_id=7654321,
data={"temperature": 23.0, "humidity": 70},
replace_data=True,
)
Deleting Messages
Remove a message by its snowflake ID:
client.delete_message(
agent_id=12345,
channel_name="telemetry",
message_id=7654321,
)
Fetching Timeseries Data
The fetch_timeseries method retrieves optimised timeseries data for specific fields across a channel's message history. This is more efficient than listing individual messages when you need columnar data for charting or analysis:
# Fetch timeseries data for temperature and humidity
ts = client.fetch_timeseries(
agent_id=12345,
channel_name="telemetry",
field_names=["temperature", "humidity"],
after=yesterday,
before=now,
limit=1000,
)
# TimeseriesResponse contains the structured timeseries data
print(ts)
Downloading Attachments
If a message has file attachments, download them using fetch_message_attachment:
# Fetch a message with attachments
message = client.fetch_message(
agent_id=12345,
channel_name="reports",
message_id=7654321,
)
# Download each attachment
for attachment in message.attachments:
content = client.fetch_message_attachment(attachment)
with open(attachment.filename, "wb") as f:
f.write(content)
Multi-Agent Batch Operations
When you need to query the same channel across multiple agents at once, the multi-agent methods are more efficient than making individual requests.
Fetching Messages Across Agents
# Fetch messages from the "telemetry" channel across three agents
batch = client.fetch_multi_agent_messages(
channel_name="telemetry",
agent_ids=[100, 200, 300],
after=yesterday,
limit=50,
)
# BatchMessageResponse contains results grouped by agent
for msg in batch.results:
print(f"Agent {msg.agent_id}: {msg.data}")
Iterating Multi-Agent Messages
The iter_multi_agent_messages method handles pagination automatically. It also splits time ranges into 2-day windows internally, because the multi-agent endpoint requires before and after to be at most 2 days apart:
# Iterate through multi-agent messages over a wide time range
for msg in client.iter_multi_agent_messages(
channel_name="telemetry",
agent_ids=[100, 200, 300],
after=datetime(2026, 1, 1, tzinfo=timezone.utc),
before=datetime(2026, 5, 1, tzinfo=timezone.utc),
):
print(msg.data)
Fetching Aggregates Across Agents
# Get the current aggregate for multiple agents
batch_agg = client.fetch_multi_agent_aggregates(
channel_name="device_status",
agent_ids=[100, 200, 300],
)
Method Reference
| Method | Returns | Description |
|---|---|---|
list_messages(...) | list[Message] | List messages with optional filters |
iter_messages(...) | MessageIterator | Paginating iterator over messages |
fetch_message(...) | Message | Fetch a single message by ID |
create_message(...) | Message | Create a new message (with optional files) |
update_message(...) | Message | None | Update a message (merge or replace) |
delete_message(...) | -- | Delete a message by ID |
fetch_timeseries(...) | TimeseriesResponse | Fetch columnar timeseries data |
fetch_message_attachment(...) | bytes | Download an attachment's content |
fetch_multi_agent_messages(...) | BatchMessageResponse | Messages across multiple agents |
iter_multi_agent_messages(...) | MultiAgentMessageIterator | Paginating multi-agent iterator |
fetch_multi_agent_aggregates(...) | BatchAggregateResponse | Aggregates across multiple agents |
Related Pages
- Channels -- channel management
- Data Client -- DataClient constructor and configuration
- Alarms -- alarm rules that trigger on message data