Processor Data Client
ProcessorDataClient is the API client available inside every processor as self.api. It extends AsyncDataClient with processor-specific behaviour: automatic agent_id defaulting, anti-recursion protection for invoking channels, and connection management helpers.
Accessing the Client
The client is created automatically when the processor is instantiated and is available as self.api inside any handler or lifecycle method.
from pydoover.processor import Application
from pydoover.models import MessageCreateEvent
class MyProcessor(Application):
async def on_message_create(self, event: MessageCreateEvent):
# self.api is the ProcessorDataClient
channels = await self.api.list_channels()
for ch in channels:
print(f"Channel: {ch.name}")
You do not need to create, configure, or close the client yourself. The framework handles all of this during the invocation lifecycle.
Agent ID Defaulting
In the standard AsyncDataClient, most methods require an agent_id parameter. ProcessorDataClient removes this requirement by defaulting to the processor's own agent ID (set during the setup phase).
# With a regular AsyncDataClient, you must pass agent_id channels = await client.list_channels(agent_id=12345) # With ProcessorDataClient inside a processor, agent_id is optional channels = await self.api.list_channels()
You can still pass an explicit agent_id to operate on a different agent. This is useful for processors with extended permissions that need to read or write data on other agents.
# Read data from a different agent other_channels = await self.api.list_channels(agent_id=67890)
The resolution logic is straightforward: if you pass an agent_id, it is used; otherwise, the client falls back to self.api.agent_id, which is set from the processor's context during setup.
Anti-Recursion Protection
When a processor is triggered by a MessageCreateEvent or AggregateUpdateEvent, the framework records the name of the channel that caused the invocation. If the handler then tries to write back to that same channel, the client raises a RuntimeError to prevent infinite loops.
async def on_message_create(self, event: MessageCreateEvent):
# This will raise RuntimeError because "sensor_readings"
# is the channel that triggered this invocation
await self.api.create_message("sensor_readings", {"processed": True})
This protection applies to three write operations:
create_messageupdate_channel_aggregateupdate_message
Overriding the Protection
In some cases, you intentionally need to write back to the invoking channel (e.g., updating an aggregate on the same channel that emitted a message). Pass allow_invoking_channel=True to bypass the check.
async def on_message_create(self, event: MessageCreateEvent):
# Explicitly allow writing to the invoking channel
await self.api.update_channel_aggregate(
event.channel.name,
{"last_processed": "2026-01-15T10:30:00Z"},
allow_invoking_channel=True,
)
When you override the anti-recursion protection, you are responsible for ensuring that the write does not trigger another invocation that writes back again. A common safe pattern is to write to the channel's aggregate (which may not trigger a subscription) rather than creating a new message.
Tag Values Channel
The tag_values channel receives special handling. Writing to it is allowed as long as the data is scoped to the processor's own app key. If you attempt to write tag data for a different app key, the client raises a RuntimeError.
This means normal tag operations via self.set_tag() work without any override, because the tag manager always scopes writes to the current app key.
Inherited Methods
ProcessorDataClient inherits all methods from AsyncDataClient. Every method accepts an optional agent_id and organisation_id parameter, both of which default from the processor's context.
Channel Operations
| Method | Description |
|---|---|
list_channels(...) | List all channels for the agent |
fetch_channel(channel_name, ...) | Fetch a single channel by name |
create_channel(channel_name, ...) | Create a new channel |
put_channel(channel_name, is_private, ...) | Create or update a channel |
Message Operations
| Method | Description |
|---|---|
list_messages(channel_name, ...) | List messages with optional time range and field filters |
iter_messages(channel_name, ...) | Return an async iterator over messages (auto-paginates) |
fetch_message(channel_name, message_id, ...) | Fetch a single message by ID |
create_message(channel_name, data, ...) | Publish a new message (anti-recursion protected) |
update_message(channel_name, message_id, data, ...) | Update an existing message |
delete_message(channel_name, message_id, ...) | Delete a message |
Aggregate Operations
| Method | Description |
|---|---|
fetch_channel_aggregate(channel_name, ...) | Fetch the current aggregate state |
fetch_channel_aggregate_attachment(channel_name, attachment_id, ...) | Download an aggregate attachment |
update_channel_aggregate(channel_name, data, ...) | Update the aggregate (anti-recursion protected) |
Both update_channel_aggregate and update_message accept a replace_data parameter (default False). When False, the update is a PATCH — the new data is merged into the existing aggregate or message data. When True, the update is a PUT — the existing data is replaced entirely with the new payload.
# Merge new fields into the existing aggregate (default PATCH)
await self.api.update_channel_aggregate("status", {"temperature": 23.5})
# Replace the entire aggregate with just this data (PUT)
await self.api.update_channel_aggregate(
"location", {"lat": -33.86, "lng": 151.21}, replace_data=True,
)
Use replace_data=True when the aggregate should be exactly the new data rather than a merge — for example, GPS location where stale fields from a previous update would be misleading.
Timeseries
| Method | Description |
|---|---|
fetch_timeseries(channel_name, field_names, ...) | Fetch timeseries data for specific fields |
list_data_series(field_name, ...) | List data series with time range filters |
Connection Helpers
These methods manage the agent's connection status in the doover_connection channel.
ping_connection_at
Reports the agent's connection status at a specific time. This updates both the message log and aggregate on the doover_connection channel.
from datetime import datetime, timezone
from pydoover.models import ConnectionStatus, ConnectionDetermination
await self.api.ping_connection_at(
online_at=datetime.now(tz=timezone.utc),
connection_status=ConnectionStatus.periodic_unknown,
determination=ConnectionDetermination.online,
user_agent="my-processor",
)
The method also resolves the processor's public IP address by default (via checkip.amazonaws.com). Set self.api.lookup_ip = False to disable this.
In most cases, use the higher-level self.ping_connection() method on the Application class instead of calling self.api.ping_connection_at() directly. The Application method handles offline threshold calculation and determination logic automatically.
update_connection_config
Updates the connection configuration for an agent, including connection type, offline threshold, and display settings.
from pydoover.models import ConnectionConfig, ConnectionType, ConnectionDisplay
config = ConnectionConfig(
connection_type=ConnectionType.continuous,
offline_after=3600,
display=ConnectionDisplay.always,
)
await self.api.update_connection_config(config)
| Parameter | Type | Description |
|---|---|---|
connection_type | ConnectionType | Required. continuous, periodic_continuous, or periodic. |
expected_interval | float or None | Expected seconds between connections (for periodic types). |
offline_after | float or None | Seconds without a ping before the agent is marked offline. |
sleep_time | float or None | Expected sleep duration in seconds. |
next_wake_time | float or None | Timestamp of next expected wake. |
display | ConnectionDisplay or None | Portal visibility: always, online_only, offline_only, or never. |
Typical Usage Patterns
Reading and Writing Data
async def on_schedule(self, event):
# Read the latest 50 messages from a channel
messages = await self.api.list_messages("sensor_readings", limit=50)
# Calculate an average
temps = [m.data.get("temperature", 0) for m in messages]
avg_temp = sum(temps) / len(temps) if temps else 0
# Write the result to a different channel
await self.api.create_message("daily_summary", {
"average_temperature": avg_temp,
"sample_count": len(temps),
})
Iterating Over Large Datasets
The iter_messages method returns an async iterator that automatically paginates through results.
async def on_manual_invoke(self, event):
# Process all messages in a channel without loading them all into memory
async for message in self.api.iter_messages("raw_data", page_size=100):
await self.process_record(message.data)
Cross-Agent Operations
Processors with extended permissions can read and write data on other agents.
async def on_schedule(self, event):
# Read data from multiple agents
for device_id in self.config.monitored_devices.value:
readings = await self.api.fetch_channel_aggregate(
"sensor_readings",
agent_id=int(device_id),
)
await self.aggregate_readings(device_id, readings)
Related Pages
- Application Class -- the processor base class that provides
self.api - Data Client -- the base DataClient and AsyncDataClient reference
- Channels -- channel operations in detail
- Messages -- message CRUD, iteration, and timeseries