Processor Application Class
The Application class in pydoover.processor is the base class for all cloud processors. You subclass it, override the event handlers and lifecycle methods you need, and pass an instance to run_app in your Lambda handler.
Constructor
The Application constructor takes no arguments. It creates the internal ProcessorDataClient, initialises a default ProcessorConfig, and prepares all instance attributes for population during the setup phase.
from pydoover.processor import Application
class MyProcessor(Application):
pass
Instance attributes like agent_id, app_key, config, tags, ui, and api are not available in __init__. They are populated during the internal _setup phase, which runs after the pre-hook filter passes. Use the setup() method for any initialisation that depends on these attributes.
Class Attributes
Three class-level attributes control which Schema, UI, and Tags subclasses the processor uses:
| Attribute | Type | Default | Purpose |
|---|---|---|---|
config_cls | type[Schema] | Schema | Configuration schema class. Set this to your custom Schema subclass. |
ui_cls | type[UI] | UI | UI definition class. Set this to your custom UI subclass. |
tags_cls | type[Tags] | Tags | Tags definition class. Set this to your custom Tags subclass. |
from pydoover.processor import Application
from pydoover import config
from pydoover.tags import Tags, Number
from pydoover.ui import UI, NumericVariable
class MyConfig(config.Schema):
poll_interval = config.Number("Poll Interval", default=30.0)
site_name = config.String("Site Name", default="Site A")
class MyTags(Tags):
temperature = Number(default=0.0)
class MyUI(UI):
temp_display = NumericVariable(
"Temperature",
units="C",
precision=1,
)
class MyProcessor(Application):
config_cls = MyConfig
ui_cls = MyUI
tags_cls = MyTags
This tells the framework to use your custom config, tags, and UI classes. They are instantiated automatically during setup and made available as self.config, self.tags, and self.ui.
Instance Attributes
After setup completes, the following attributes are available on the processor instance:
| Attribute | Type | Description |
|---|---|---|
agent_id | int | The agent this processor is deployed to |
app_key | str | The application key for this deployment |
app_id | str | The application ID |
organisation_id | int | str | The organisation that owns this agent |
api | ProcessorDataClient | The API client for data operations |
config | Schema | The loaded configuration instance |
tags | Tags | The tags manager instance |
ui | UI | The UI definition instance |
rpc | RPCManager | The RPC handler manager |
ui_manager | UICommandsManager | Handles incoming UI interaction events |
tag_manager | TagsManagerProcessor | Low-level tag operations |
event_type | str | The current event's operation name (e.g., "on_message_create") |
Lifecycle Methods
setup()
Called after the framework's internal setup completes (token upgrade, tag loading, config injection). Override this to perform your own initialisation.
async def setup(self):
# Fetch a channel you will write to later
self.output_channel = await self.fetch_channel("processed_data")
# Initialise any state your handlers need
self.threshold = self.config.alert_threshold.value
You do not need to call super().setup(). The base implementation is a no-op.
close()
Called after the event handler returns and tags have been committed. Override this to clean up resources (close HTTP sessions, flush buffers, etc.).
async def close(self):
if hasattr(self, "external_session"):
await self.external_session.close()
You do not need to call super().close(). The base implementation is a no-op.
Event Handlers
Override these methods to handle specific event types. Each receives a typed event object. If you do not override a handler, the event is skipped with SkipReason.no_handler (except on_deployment, which always runs the framework's UI schema publishing).
on_message_create(event: MessageCreateEvent)
Fires when a new message arrives on a channel this processor is subscribed to.
async def on_message_create(self, event: MessageCreateEvent):
channel_name = event.channel.name
data = event.message.data
if channel_name == "sensor_readings":
await self.process_sensor_data(data)
on_aggregate_update(event: AggregateUpdateEvent)
Fires when a channel's aggregate state is updated.
async def on_aggregate_update(self, event: AggregateUpdateEvent):
new_state = event.aggregate.data
previous_request = event.request_data.data
# React to the state change
A device processor that filters by channel name, extracts sensor data from the aggregate, updates the UI, and pings the connection status:
from pydoover.models import (
AggregateUpdateEvent, ConnectionStatus,
ConnectionConfig, ConnectionType,
)
from pydoover.models.data.connection import ConnectionDisplay
async def on_aggregate_update(self, event: AggregateUpdateEvent):
if event.channel.name == "deployment_config":
await self.publish_ui_schema()
return
if event.channel.name != "device_uplink":
return
aggregate = event.aggregate.data
readings = aggregate["data"]["readings"]
graph_data = self._build_graph_data(readings)
await self.api.update_channel_aggregate("ui_state", graph_data)
await self.api.create_message("ui_state", graph_data)
await self.ping_connection(
connection_status=ConnectionStatus.continuous_online,
)
on_deployment(event: DeploymentEvent)
Fires when the application is deployed or redeployed to an agent. The framework automatically publishes the UI schema on deployment (for non-static UIs), so you do not need to do that yourself.
Override this to run additional one-off setup such as seeding channels or priming tag values.
async def on_deployment(self, event: DeploymentEvent):
# Create a channel that this processor will write to
await self.api.create_channel("processed_data", is_private=True)
# Set initial tag values
await self.set_tag("last_deployed", event.app_display_name)
A device processor that configures connection tracking on deployment so the portal shows online/offline status:
from pydoover.models import (
DeploymentEvent, ConnectionStatus,
ConnectionConfig, ConnectionType,
)
from pydoover.models.data.connection import ConnectionDisplay
async def on_deployment(self, event: DeploymentEvent):
await self.publish_ui_schema()
await self.api.update_connection_config(ConnectionConfig(
connection_type=ConnectionType.continuous,
offline_after=3600,
display=ConnectionDisplay.always,
))
await self.ping_connection(
connection_status=ConnectionStatus.continuous_online,
)
on_schedule(event: ScheduleEvent)
Fires when a cron or rate schedule triggers.
async def on_schedule(self, event: ScheduleEvent):
# Generate a daily report
await self.generate_report()
A processor uses scheduled invocations to generate dummy sensor data for testing. Note the use of allow_invoking_channel=True to write to a channel the processor is subscribed to:
async def on_schedule(self, event: ScheduleEvent):
if not self.config.test_mode.value:
return
await self.api.create_message(
"sensor_uplink",
self._generate_dummy_data(),
allow_invoking_channel=True,
)
on_ingestion_endpoint(event: IngestionEndpointEvent)
Fires when an external HTTP request is received at the processor's ingestion endpoint.
The payload is automatically decoded from base64 and parsed as JSON by default. Override parse_ingestion_event_payload if your payload uses a different format (e.g., a C-packed struct).
async def on_ingestion_endpoint(self, event: IngestionEndpointEvent):
# event.payload is the parsed request body
device_data = event.payload
await self.api.create_message("external_data", device_data)
An integration processor that bridges external hardware monitors to Doover. It receives HTTP data via the ingestion endpoint, looks up the target device agent by serial number, and routes the data to the correct agent's channel using cross-agent messaging:
async def on_ingestion_endpoint(self, event: IngestionEndpointEvent):
payload = event.payload
if not isinstance(payload, dict) or "serial_number" not in payload:
return
serial_num = int(payload["serial_number"])
origin = payload.get("origin", "")
if origin == "device_monitor":
agent_id = self._lookup_agent_id(serial_num)
if agent_id is None:
return
await self.api.create_message(
"device_uplink", payload, agent_id=agent_id,
)
await self.api.update_channel_aggregate(
"device_uplink", payload, agent_id=agent_id,
)
elif origin.startswith("doover"):
self._send_command_to_external_api(payload)
This pattern — ingestion endpoint + serial number lookup + cross-agent write — is common for integrations that bridge between external hardware APIs and Doover device agents.
on_manual_invoke(event: ManualInvokeEvent)
Fires when the processor is triggered manually from the UI or CLI.
async def on_manual_invoke(self, event: ManualInvokeEvent):
action = event.payload.get("action")
if action == "recalculate":
await self.recalculate_all()
Filtering
Processors support two filtering stages that let you reject events before the handler runs, reducing execution time and API costs.
pre_hook_filter(event) -> bool
Runs before any API calls or setup. The event is the typed event object (e.g., MessageCreateEvent). Return False to skip processing. This is the cheapest rejection point.
async def pre_hook_filter(self, event):
# Only process messages from the "temperature" channel
if hasattr(event, "channel") and event.channel.name != "temperature":
return False
return True
Because the pre-hook runs before the token upgrade and tag loading, you cannot access self.config, self.tags, or make API calls here. You can only inspect the raw event data.
post_setup_filter(event) -> bool
Runs after setup completes. The full processor state is available -- tags, config, API client. Return False to skip processing.
async def post_setup_filter(self, event):
# Skip if the processor is in maintenance mode (a tag value)
if self.get_tag("maintenance_mode"):
return False
return True
Use this when the filtering decision depends on configuration or tag state that is not available in the pre-hook.
Pre-hook filtering is significantly cheaper than post-setup filtering because it avoids the token upgrade and configuration loading API calls. Use the pre-hook for simple payload-based checks and reserve the post-setup filter for context-dependent decisions.
Tag Operations
Tags are persistent key-value pairs scoped to the processor's app key. They survive across invocations and are stored in the tag_values channel.
get_tag(key, default=None)
Read a tag value. Returns the default if the tag does not exist.
count = self.get_tag("invocation_count", default=0)
set_tag(key, value, log=False)
Set a tag value. Pass log=True to record the change in the tag history.
await self.set_tag("invocation_count", count + 1)
await self.set_tag("last_alert_time", "2026-01-15T10:30:00Z", log=True)
Tag changes are committed automatically at the end of the invocation. You do not need to call a commit method.
Self-Loop Protection
If a processor is subscribed to the tag_values channel and the incoming event was caused by this processor's own tag writes, the framework automatically skips the invocation with SkipReason.tag_values_self_loop. This prevents infinite recursion.
Notifications
send_notification(message, title=None, severity=None, topic=None, agent_id=None)
Send a notification through the platform's notification system. The notification is fanned out to any subscriptions (email, SMS, web push, HTTP webhook) that match the severity and topic.
from pydoover.models import NotificationSeverity
await self.send_notification(
"Temperature exceeded 40C on sensor 3",
title="High Temperature Alert",
severity=NotificationSeverity.warning,
topic="temperature",
)
You can also pass a pre-constructed Notification object for full control over the notification payload.
UI Schema Publishing
publish_ui_schema(clear=True)
Publishes the processor's UI schema to the ui_state channel aggregate. This is called automatically on deployment events (for non-static UIs), so you rarely need to call it manually.
When clear=True (the default), the existing UI state for this app key is replaced entirely. When clear=False, the schema is merged into the existing state.
# Force a UI schema republish await self.publish_ui_schema()
Connection Tracking
ping_connection(online_at=None, connection_status=..., offline_at=None)
Report the connection status for this agent. This updates the doover_connection channel, which the platform uses for online/offline monitoring.
| Parameter | Type | Description |
|---|---|---|
online_at | datetime or None | When the agent was last online. Defaults to datetime.now(tz=timezone.utc). |
connection_status | ConnectionStatus | The connection status enum value. Defaults to ConnectionStatus.periodic_unknown. |
offline_at | datetime or None | When the agent should be marked offline. If provided, offline_after is calculated as the delta between online_at and offline_at. |
If offline_at is not provided, the offline_after threshold is read from the agent's connection configuration (defaulting to 1 hour). The method automatically determines whether the agent is online or offline based on whether the current time exceeds the offline_after window.
from datetime import datetime, timezone
await self.ping_connection(
online_at=datetime.now(tz=timezone.utc),
)
For periodic devices that wake on a schedule, pass offline_at to tell the platform when the next check-in is expected. This lets the portal show the device as offline only if it misses its window.
from datetime import datetime, timezone, timedelta
now = datetime.now(timezone.utc)
sleep_hours = 6
offline_at = now + timedelta(hours=sleep_hours * 2.5)
await self.ping_connection(
online_at=now,
connection_status=ConnectionStatus.periodic_unknown,
offline_at=offline_at,
)
Connection Tracking Patterns
Connection tracking combines update_connection_config (sets the connection type and thresholds) with ping_connection (reports each check-in). Two common patterns:
Continuous — for processors bridging always-on data streams. Set once on deployment:
from pydoover.models import (
ConnectionConfig, ConnectionType, ConnectionStatus,
)
from pydoover.models.data.connection import ConnectionDisplay
async def on_deployment(self, event):
await self.api.update_connection_config(ConnectionConfig(
connection_type=ConnectionType.continuous,
offline_after=3600,
display=ConnectionDisplay.always,
))
await self.ping_connection(
connection_status=ConnectionStatus.continuous_online,
)
Periodic — for battery-powered devices that wake on a schedule. Recalculate offline_after each time the device reports, based on its sleep interval:
from pydoover.models import ConnectionConfig, ConnectionType
OFFLINE_AFTER_MULTIPLIER = 2.5
async def _publish_connection_config(self, sleep_min: int):
interval_secs = sleep_min * 60
config = ConnectionConfig(
connection_type=ConnectionType.periodic,
expected_interval=interval_secs,
offline_after=interval_secs * OFFLINE_AFTER_MULTIPLIER,
sleep_time=interval_secs,
next_wake_time=int(
(now + timedelta(seconds=interval_secs)).timestamp() * 1000
),
)
await self.api.update_connection_config(config)
Channel Fetching
fetch_channel(channel_name)
Convenience method to fetch a channel by name on the current agent.
channel = await self.fetch_channel("sensor_readings")
Invocation Summary
After every invocation, the framework automatically publishes a summary message to the configured invocation target channels. The summary includes:
app_key,app_id,agent_idevent_type(e.g.,"on_message_create")started_at(millisecond timestamp)duration_msstatus("success","skipped", or"error")skip_reason(if skipped:no_handler,pre_hook_filter,tag_values_self_loop, orpost_setup_filter)error(type and message, if an error occurred)
The target channels are configured via ProcessorConfig.inv_targets. By default, the summary is published to a channel named dv-proc-inv-{app_id} on the processor's own agent.
ProcessorSkipped and SkipReason
When a processor invocation is intentionally skipped, a ProcessorSkipped exception is raised internally. This is not an error -- it is the normal mechanism for recording why an invocation did not execute its handler.
The SkipReason enum has four values:
| Reason | When it occurs |
|---|---|
no_handler | The event type has no overridden handler method |
pre_hook_filter | pre_hook_filter() returned False |
tag_values_self_loop | The event was caused by this processor's own tag writes |
post_setup_filter | post_setup_filter() returned False |
Skipped invocations are tracked separately from errors in the invocation summary.
Complete Example
This example shows a processor that monitors temperature readings, sends alerts when thresholds are exceeded, and tracks state via tags.
from pydoover.processor import Application, run_app
from pydoover import config
from pydoover.tags import Tags, Number, Boolean
from pydoover.models import MessageCreateEvent, DeploymentEvent, NotificationSeverity
class Config(config.Schema):
high_temp_threshold = config.Number(
"High Temperature Threshold",
default=40.0,
description="Temperature above which an alert is sent.",
)
site_name = config.String("Site Name", default="Unnamed Site")
class AppTags(Tags):
last_temperature = Number(default=0.0)
alert_active = Boolean(default=False)
class TemperatureMonitor(Application):
config_cls = Config
tags_cls = AppTags
async def pre_hook_filter(self, event):
# Only process messages from the temperature channel
if hasattr(event, "channel"):
return event.channel.name == "temperature"
return True
async def setup(self):
self.threshold = self.config.high_temp_threshold.value
self.site = self.config.site_name.value
async def on_message_create(self, event: MessageCreateEvent):
temp = event.message.data.get("value")
if temp is None:
return
await self.set_tag("last_temperature", temp)
if temp > self.threshold and not self.get_tag("alert_active"):
await self.set_tag("alert_active", True)
await self.send_notification(
f"Temperature at {self.site} is {temp}C (threshold: {self.threshold}C)",
title="High Temperature Alert",
severity=NotificationSeverity.warning,
)
elif temp <= self.threshold and self.get_tag("alert_active"):
await self.set_tag("alert_active", False)
async def on_deployment(self, event: DeploymentEvent):
# Seed initial tag values on first deployment
await self.set_tag("alert_active", False)
# Lambda entry point
app = TemperatureMonitor()
def handler(event, context):
return run_app(app, event, context)
Related Pages
- Cloud Processors Overview -- high-level introduction and lifecycle summary
- Event Types -- all event types with their fields
- Processor Data Client -- API client with anti-recursion protection
- Configuration -- defining typed configuration schemas
- Application Lifecycle -- comparison of Docker and Processor lifecycles