Skip to content

Processor Data Client

ProcessorDataClient is the API client available inside every processor as self.api. It extends AsyncDataClient with processor-specific behaviour: automatic agent_id defaulting, anti-recursion protection for invoking channels, and connection management helpers.

Accessing the Client

The client is created automatically when the processor is instantiated and is available as self.api inside any handler or lifecycle method.

from pydoover.processor import Application
from pydoover.models import MessageCreateEvent


class MyProcessor(Application):
    async def on_message_create(self, event: MessageCreateEvent):
        # self.api is the ProcessorDataClient
        channels = await self.api.list_channels()
        for ch in channels:
            print(f"Channel: {ch.name}")

You do not need to create, configure, or close the client yourself. The framework handles all of this during the invocation lifecycle.

Agent ID Defaulting

In the standard AsyncDataClient, most methods require an agent_id parameter. ProcessorDataClient removes this requirement by defaulting to the processor's own agent ID (set during the setup phase).

# With a regular AsyncDataClient, you must pass agent_id
channels = await client.list_channels(agent_id=12345)

# With ProcessorDataClient inside a processor, agent_id is optional
channels = await self.api.list_channels()

You can still pass an explicit agent_id to operate on a different agent. This is useful for processors with extended permissions that need to read or write data on other agents.

# Read data from a different agent
other_channels = await self.api.list_channels(agent_id=67890)

The resolution logic is straightforward: if you pass an agent_id, it is used; otherwise, the client falls back to self.api.agent_id, which is set from the processor's context during setup.

Anti-Recursion Protection

When a processor is triggered by a MessageCreateEvent or AggregateUpdateEvent, the framework records the name of the channel that caused the invocation. If the handler then tries to write back to that same channel, the client raises a RuntimeError to prevent infinite loops.

async def on_message_create(self, event: MessageCreateEvent):
    # This will raise RuntimeError because "sensor_readings"
    # is the channel that triggered this invocation
    await self.api.create_message("sensor_readings", {"processed": True})

This protection applies to three write operations:

  • create_message
  • update_channel_aggregate
  • update_message

Overriding the Protection

In some cases, you intentionally need to write back to the invoking channel (e.g., updating an aggregate on the same channel that emitted a message). Pass allow_invoking_channel=True to bypass the check.

async def on_message_create(self, event: MessageCreateEvent):
    # Explicitly allow writing to the invoking channel
    await self.api.update_channel_aggregate(
        event.channel.name,
        {"last_processed": "2026-01-15T10:30:00Z"},
        allow_invoking_channel=True,
    )
Warning

When you override the anti-recursion protection, you are responsible for ensuring that the write does not trigger another invocation that writes back again. A common safe pattern is to write to the channel's aggregate (which may not trigger a subscription) rather than creating a new message.

Tag Values Channel

The tag_values channel receives special handling. Writing to it is allowed as long as the data is scoped to the processor's own app key. If you attempt to write tag data for a different app key, the client raises a RuntimeError.

This means normal tag operations via self.set_tag() work without any override, because the tag manager always scopes writes to the current app key.

Inherited Methods

ProcessorDataClient inherits all methods from AsyncDataClient. Every method accepts an optional agent_id and organisation_id parameter, both of which default from the processor's context.

Channel Operations

MethodDescription
list_channels(...)List all channels for the agent
fetch_channel(channel_name, ...)Fetch a single channel by name
create_channel(channel_name, ...)Create a new channel
put_channel(channel_name, is_private, ...)Create or update a channel

Message Operations

MethodDescription
list_messages(channel_name, ...)List messages with optional time range and field filters
iter_messages(channel_name, ...)Return an async iterator over messages (auto-paginates)
fetch_message(channel_name, message_id, ...)Fetch a single message by ID
create_message(channel_name, data, ...)Publish a new message (anti-recursion protected)
update_message(channel_name, message_id, data, ...)Update an existing message
delete_message(channel_name, message_id, ...)Delete a message

Aggregate Operations

MethodDescription
fetch_channel_aggregate(channel_name, ...)Fetch the current aggregate state
fetch_channel_aggregate_attachment(channel_name, attachment_id, ...)Download an aggregate attachment
update_channel_aggregate(channel_name, data, ...)Update the aggregate (anti-recursion protected)

Both update_channel_aggregate and update_message accept a replace_data parameter (default False). When False, the update is a PATCH — the new data is merged into the existing aggregate or message data. When True, the update is a PUT — the existing data is replaced entirely with the new payload.

# Merge new fields into the existing aggregate (default PATCH)
await self.api.update_channel_aggregate("status", {"temperature": 23.5})

# Replace the entire aggregate with just this data (PUT)
await self.api.update_channel_aggregate(
    "location", {"lat": -33.86, "lng": 151.21}, replace_data=True,
)

Use replace_data=True when the aggregate should be exactly the new data rather than a merge — for example, GPS location where stale fields from a previous update would be misleading.

Timeseries

MethodDescription
fetch_timeseries(channel_name, field_names, ...)Fetch timeseries data for specific fields
list_data_series(field_name, ...)List data series with time range filters

Connection Helpers

These methods manage the agent's connection status in the doover_connection channel.

ping_connection_at

Reports the agent's connection status at a specific time. This updates both the message log and aggregate on the doover_connection channel.

from datetime import datetime, timezone
from pydoover.models import ConnectionStatus, ConnectionDetermination

await self.api.ping_connection_at(
    online_at=datetime.now(tz=timezone.utc),
    connection_status=ConnectionStatus.periodic_unknown,
    determination=ConnectionDetermination.online,
    user_agent="my-processor",
)

The method also resolves the processor's public IP address by default (via checkip.amazonaws.com). Set self.api.lookup_ip = False to disable this.

Information Circle

In most cases, use the higher-level self.ping_connection() method on the Application class instead of calling self.api.ping_connection_at() directly. The Application method handles offline threshold calculation and determination logic automatically.

update_connection_config

Updates the connection configuration for an agent, including connection type, offline threshold, and display settings.

from pydoover.models import ConnectionConfig, ConnectionType, ConnectionDisplay

config = ConnectionConfig(
    connection_type=ConnectionType.continuous,
    offline_after=3600,
    display=ConnectionDisplay.always,
)
await self.api.update_connection_config(config)
ParameterTypeDescription
connection_typeConnectionTypeRequired. continuous, periodic_continuous, or periodic.
expected_intervalfloat or NoneExpected seconds between connections (for periodic types).
offline_afterfloat or NoneSeconds without a ping before the agent is marked offline.
sleep_timefloat or NoneExpected sleep duration in seconds.
next_wake_timefloat or NoneTimestamp of next expected wake.
displayConnectionDisplay or NonePortal visibility: always, online_only, offline_only, or never.

Typical Usage Patterns

Reading and Writing Data

async def on_schedule(self, event):
    # Read the latest 50 messages from a channel
    messages = await self.api.list_messages("sensor_readings", limit=50)

    # Calculate an average
    temps = [m.data.get("temperature", 0) for m in messages]
    avg_temp = sum(temps) / len(temps) if temps else 0

    # Write the result to a different channel
    await self.api.create_message("daily_summary", {
        "average_temperature": avg_temp,
        "sample_count": len(temps),
    })

Iterating Over Large Datasets

The iter_messages method returns an async iterator that automatically paginates through results.

async def on_manual_invoke(self, event):
    # Process all messages in a channel without loading them all into memory
    async for message in self.api.iter_messages("raw_data", page_size=100):
        await self.process_record(message.data)

Cross-Agent Operations

Processors with extended permissions can read and write data on other agents.

async def on_schedule(self, event):
    # Read data from multiple agents
    for device_id in self.config.monitored_devices.value:
        readings = await self.api.fetch_channel_aggregate(
            "sensor_readings",
            agent_id=int(device_id),
        )
        await self.aggregate_readings(device_id, readings)

Related Pages

  • Application Class -- the processor base class that provides self.api
  • Data Client -- the base DataClient and AsyncDataClient reference
  • Channels -- channel operations in detail
  • Messages -- message CRUD, iteration, and timeseries