Skip to content

Processor Application Class

The Application class in pydoover.processor is the base class for all cloud processors. You subclass it, override the event handlers and lifecycle methods you need, and pass an instance to run_app in your Lambda handler.

Constructor

The Application constructor takes no arguments. It creates the internal ProcessorDataClient, initialises a default ProcessorConfig, and prepares all instance attributes for population during the setup phase.

from pydoover.processor import Application

class MyProcessor(Application):
    pass

Instance attributes like agent_id, app_key, config, tags, ui, and api are not available in __init__. They are populated during the internal _setup phase, which runs after the pre-hook filter passes. Use the setup() method for any initialisation that depends on these attributes.

Class Attributes

Three class-level attributes control which Schema, UI, and Tags subclasses the processor uses:

AttributeTypeDefaultPurpose
config_clstype[Schema]SchemaConfiguration schema class. Set this to your custom Schema subclass.
ui_clstype[UI]UIUI definition class. Set this to your custom UI subclass.
tags_clstype[Tags]TagsTags definition class. Set this to your custom Tags subclass.
from pydoover.processor import Application
from pydoover import config
from pydoover.tags import Tags, Number
from pydoover.ui import UI, NumericVariable


class MyConfig(config.Schema):
    poll_interval = config.Number("Poll Interval", default=30.0)
    site_name = config.String("Site Name", default="Site A")


class MyTags(Tags):
    temperature = Number(default=0.0)


class MyUI(UI):
    temp_display = NumericVariable(
        "Temperature",
        units="C",
        precision=1,
    )


class MyProcessor(Application):
    config_cls = MyConfig
    ui_cls = MyUI
    tags_cls = MyTags

This tells the framework to use your custom config, tags, and UI classes. They are instantiated automatically during setup and made available as self.config, self.tags, and self.ui.

Instance Attributes

After setup completes, the following attributes are available on the processor instance:

AttributeTypeDescription
agent_idintThe agent this processor is deployed to
app_keystrThe application key for this deployment
app_idstrThe application ID
organisation_idint | strThe organisation that owns this agent
apiProcessorDataClientThe API client for data operations
configSchemaThe loaded configuration instance
tagsTagsThe tags manager instance
uiUIThe UI definition instance
rpcRPCManagerThe RPC handler manager
ui_managerUICommandsManagerHandles incoming UI interaction events
tag_managerTagsManagerProcessorLow-level tag operations
event_typestrThe current event's operation name (e.g., "on_message_create")

Lifecycle Methods

setup()

Called after the framework's internal setup completes (token upgrade, tag loading, config injection). Override this to perform your own initialisation.

async def setup(self):
    # Fetch a channel you will write to later
    self.output_channel = await self.fetch_channel("processed_data")

    # Initialise any state your handlers need
    self.threshold = self.config.alert_threshold.value

You do not need to call super().setup(). The base implementation is a no-op.

close()

Called after the event handler returns and tags have been committed. Override this to clean up resources (close HTTP sessions, flush buffers, etc.).

async def close(self):
    if hasattr(self, "external_session"):
        await self.external_session.close()

You do not need to call super().close(). The base implementation is a no-op.

Event Handlers

Override these methods to handle specific event types. Each receives a typed event object. If you do not override a handler, the event is skipped with SkipReason.no_handler (except on_deployment, which always runs the framework's UI schema publishing).

on_message_create(event: MessageCreateEvent)

Fires when a new message arrives on a channel this processor is subscribed to.

async def on_message_create(self, event: MessageCreateEvent):
    channel_name = event.channel.name
    data = event.message.data

    if channel_name == "sensor_readings":
        await self.process_sensor_data(data)

on_aggregate_update(event: AggregateUpdateEvent)

Fires when a channel's aggregate state is updated.

async def on_aggregate_update(self, event: AggregateUpdateEvent):
    new_state = event.aggregate.data
    previous_request = event.request_data.data
    # React to the state change
Information Circle
Example

A device processor that filters by channel name, extracts sensor data from the aggregate, updates the UI, and pings the connection status:

from pydoover.models import (
    AggregateUpdateEvent, ConnectionStatus,
    ConnectionConfig, ConnectionType,
)
from pydoover.models.data.connection import ConnectionDisplay

async def on_aggregate_update(self, event: AggregateUpdateEvent):
    if event.channel.name == "deployment_config":
        await self.publish_ui_schema()
        return

    if event.channel.name != "device_uplink":
        return

    aggregate = event.aggregate.data
    readings = aggregate["data"]["readings"]
    graph_data = self._build_graph_data(readings)

    await self.api.update_channel_aggregate("ui_state", graph_data)
    await self.api.create_message("ui_state", graph_data)

    await self.ping_connection(
        connection_status=ConnectionStatus.continuous_online,
    )

on_deployment(event: DeploymentEvent)

Fires when the application is deployed or redeployed to an agent. The framework automatically publishes the UI schema on deployment (for non-static UIs), so you do not need to do that yourself.

Override this to run additional one-off setup such as seeding channels or priming tag values.

async def on_deployment(self, event: DeploymentEvent):
    # Create a channel that this processor will write to
    await self.api.create_channel("processed_data", is_private=True)

    # Set initial tag values
    await self.set_tag("last_deployed", event.app_display_name)
Information Circle
Example

A device processor that configures connection tracking on deployment so the portal shows online/offline status:

from pydoover.models import (
    DeploymentEvent, ConnectionStatus,
    ConnectionConfig, ConnectionType,
)
from pydoover.models.data.connection import ConnectionDisplay

async def on_deployment(self, event: DeploymentEvent):
    await self.publish_ui_schema()
    await self.api.update_connection_config(ConnectionConfig(
        connection_type=ConnectionType.continuous,
        offline_after=3600,
        display=ConnectionDisplay.always,
    ))
    await self.ping_connection(
        connection_status=ConnectionStatus.continuous_online,
    )

on_schedule(event: ScheduleEvent)

Fires when a cron or rate schedule triggers.

async def on_schedule(self, event: ScheduleEvent):
    # Generate a daily report
    await self.generate_report()
Information Circle
Example

A processor uses scheduled invocations to generate dummy sensor data for testing. Note the use of allow_invoking_channel=True to write to a channel the processor is subscribed to:

async def on_schedule(self, event: ScheduleEvent):
    if not self.config.test_mode.value:
        return

    await self.api.create_message(
        "sensor_uplink",
        self._generate_dummy_data(),
        allow_invoking_channel=True,
    )

on_ingestion_endpoint(event: IngestionEndpointEvent)

Fires when an external HTTP request is received at the processor's ingestion endpoint.

The payload is automatically decoded from base64 and parsed as JSON by default. Override parse_ingestion_event_payload if your payload uses a different format (e.g., a C-packed struct).

async def on_ingestion_endpoint(self, event: IngestionEndpointEvent):
    # event.payload is the parsed request body
    device_data = event.payload
    await self.api.create_message("external_data", device_data)
Information Circle
Example

An integration processor that bridges external hardware monitors to Doover. It receives HTTP data via the ingestion endpoint, looks up the target device agent by serial number, and routes the data to the correct agent's channel using cross-agent messaging:

async def on_ingestion_endpoint(self, event: IngestionEndpointEvent):
    payload = event.payload
    if not isinstance(payload, dict) or "serial_number" not in payload:
        return

    serial_num = int(payload["serial_number"])
    origin = payload.get("origin", "")

    if origin == "device_monitor":
        agent_id = self._lookup_agent_id(serial_num)
        if agent_id is None:
            return
        await self.api.create_message(
            "device_uplink", payload, agent_id=agent_id,
        )
        await self.api.update_channel_aggregate(
            "device_uplink", payload, agent_id=agent_id,
        )
    elif origin.startswith("doover"):
        self._send_command_to_external_api(payload)

This pattern — ingestion endpoint + serial number lookup + cross-agent write — is common for integrations that bridge between external hardware APIs and Doover device agents.

on_manual_invoke(event: ManualInvokeEvent)

Fires when the processor is triggered manually from the UI or CLI.

async def on_manual_invoke(self, event: ManualInvokeEvent):
    action = event.payload.get("action")
    if action == "recalculate":
        await self.recalculate_all()

Filtering

Processors support two filtering stages that let you reject events before the handler runs, reducing execution time and API costs.

pre_hook_filter(event) -> bool

Runs before any API calls or setup. The event is the typed event object (e.g., MessageCreateEvent). Return False to skip processing. This is the cheapest rejection point.

async def pre_hook_filter(self, event):
    # Only process messages from the "temperature" channel
    if hasattr(event, "channel") and event.channel.name != "temperature":
        return False
    return True

Because the pre-hook runs before the token upgrade and tag loading, you cannot access self.config, self.tags, or make API calls here. You can only inspect the raw event data.

post_setup_filter(event) -> bool

Runs after setup completes. The full processor state is available -- tags, config, API client. Return False to skip processing.

async def post_setup_filter(self, event):
    # Skip if the processor is in maintenance mode (a tag value)
    if self.get_tag("maintenance_mode"):
        return False
    return True

Use this when the filtering decision depends on configuration or tag state that is not available in the pre-hook.

Information Circle

Pre-hook filtering is significantly cheaper than post-setup filtering because it avoids the token upgrade and configuration loading API calls. Use the pre-hook for simple payload-based checks and reserve the post-setup filter for context-dependent decisions.

Tag Operations

Tags are persistent key-value pairs scoped to the processor's app key. They survive across invocations and are stored in the tag_values channel.

get_tag(key, default=None)

Read a tag value. Returns the default if the tag does not exist.

count = self.get_tag("invocation_count", default=0)

set_tag(key, value, log=False)

Set a tag value. Pass log=True to record the change in the tag history.

await self.set_tag("invocation_count", count + 1)
await self.set_tag("last_alert_time", "2026-01-15T10:30:00Z", log=True)

Tag changes are committed automatically at the end of the invocation. You do not need to call a commit method.

Self-Loop Protection

If a processor is subscribed to the tag_values channel and the incoming event was caused by this processor's own tag writes, the framework automatically skips the invocation with SkipReason.tag_values_self_loop. This prevents infinite recursion.

Notifications

send_notification(message, title=None, severity=None, topic=None, agent_id=None)

Send a notification through the platform's notification system. The notification is fanned out to any subscriptions (email, SMS, web push, HTTP webhook) that match the severity and topic.

from pydoover.models import NotificationSeverity

await self.send_notification(
    "Temperature exceeded 40C on sensor 3",
    title="High Temperature Alert",
    severity=NotificationSeverity.warning,
    topic="temperature",
)

You can also pass a pre-constructed Notification object for full control over the notification payload.

UI Schema Publishing

publish_ui_schema(clear=True)

Publishes the processor's UI schema to the ui_state channel aggregate. This is called automatically on deployment events (for non-static UIs), so you rarely need to call it manually.

When clear=True (the default), the existing UI state for this app key is replaced entirely. When clear=False, the schema is merged into the existing state.

# Force a UI schema republish
await self.publish_ui_schema()

Connection Tracking

ping_connection(online_at=None, connection_status=..., offline_at=None)

Report the connection status for this agent. This updates the doover_connection channel, which the platform uses for online/offline monitoring.

ParameterTypeDescription
online_atdatetime or NoneWhen the agent was last online. Defaults to datetime.now(tz=timezone.utc).
connection_statusConnectionStatusThe connection status enum value. Defaults to ConnectionStatus.periodic_unknown.
offline_atdatetime or NoneWhen the agent should be marked offline. If provided, offline_after is calculated as the delta between online_at and offline_at.

If offline_at is not provided, the offline_after threshold is read from the agent's connection configuration (defaulting to 1 hour). The method automatically determines whether the agent is online or offline based on whether the current time exceeds the offline_after window.

from datetime import datetime, timezone

await self.ping_connection(
    online_at=datetime.now(tz=timezone.utc),
)

For periodic devices that wake on a schedule, pass offline_at to tell the platform when the next check-in is expected. This lets the portal show the device as offline only if it misses its window.

from datetime import datetime, timezone, timedelta

now = datetime.now(timezone.utc)
sleep_hours = 6
offline_at = now + timedelta(hours=sleep_hours * 2.5)

await self.ping_connection(
    online_at=now,
    connection_status=ConnectionStatus.periodic_unknown,
    offline_at=offline_at,
)

Connection Tracking Patterns

Connection tracking combines update_connection_config (sets the connection type and thresholds) with ping_connection (reports each check-in). Two common patterns:

Continuous — for processors bridging always-on data streams. Set once on deployment:

from pydoover.models import (
    ConnectionConfig, ConnectionType, ConnectionStatus,
)
from pydoover.models.data.connection import ConnectionDisplay

async def on_deployment(self, event):
    await self.api.update_connection_config(ConnectionConfig(
        connection_type=ConnectionType.continuous,
        offline_after=3600,
        display=ConnectionDisplay.always,
    ))
    await self.ping_connection(
        connection_status=ConnectionStatus.continuous_online,
    )

Periodic — for battery-powered devices that wake on a schedule. Recalculate offline_after each time the device reports, based on its sleep interval:

from pydoover.models import ConnectionConfig, ConnectionType

OFFLINE_AFTER_MULTIPLIER = 2.5

async def _publish_connection_config(self, sleep_min: int):
    interval_secs = sleep_min * 60
    config = ConnectionConfig(
        connection_type=ConnectionType.periodic,
        expected_interval=interval_secs,
        offline_after=interval_secs * OFFLINE_AFTER_MULTIPLIER,
        sleep_time=interval_secs,
        next_wake_time=int(
            (now + timedelta(seconds=interval_secs)).timestamp() * 1000
        ),
    )
    await self.api.update_connection_config(config)

Channel Fetching

fetch_channel(channel_name)

Convenience method to fetch a channel by name on the current agent.

channel = await self.fetch_channel("sensor_readings")

Invocation Summary

After every invocation, the framework automatically publishes a summary message to the configured invocation target channels. The summary includes:

  • app_key, app_id, agent_id
  • event_type (e.g., "on_message_create")
  • started_at (millisecond timestamp)
  • duration_ms
  • status ("success", "skipped", or "error")
  • skip_reason (if skipped: no_handler, pre_hook_filter, tag_values_self_loop, or post_setup_filter)
  • error (type and message, if an error occurred)

The target channels are configured via ProcessorConfig.inv_targets. By default, the summary is published to a channel named dv-proc-inv-{app_id} on the processor's own agent.

ProcessorSkipped and SkipReason

When a processor invocation is intentionally skipped, a ProcessorSkipped exception is raised internally. This is not an error -- it is the normal mechanism for recording why an invocation did not execute its handler.

The SkipReason enum has four values:

ReasonWhen it occurs
no_handlerThe event type has no overridden handler method
pre_hook_filterpre_hook_filter() returned False
tag_values_self_loopThe event was caused by this processor's own tag writes
post_setup_filterpost_setup_filter() returned False

Skipped invocations are tracked separately from errors in the invocation summary.

Complete Example

This example shows a processor that monitors temperature readings, sends alerts when thresholds are exceeded, and tracks state via tags.

from pydoover.processor import Application, run_app
from pydoover import config
from pydoover.tags import Tags, Number, Boolean
from pydoover.models import MessageCreateEvent, DeploymentEvent, NotificationSeverity


class Config(config.Schema):
    high_temp_threshold = config.Number(
        "High Temperature Threshold",
        default=40.0,
        description="Temperature above which an alert is sent.",
    )
    site_name = config.String("Site Name", default="Unnamed Site")


class AppTags(Tags):
    last_temperature = Number(default=0.0)
    alert_active = Boolean(default=False)


class TemperatureMonitor(Application):
    config_cls = Config
    tags_cls = AppTags

    async def pre_hook_filter(self, event):
        # Only process messages from the temperature channel
        if hasattr(event, "channel"):
            return event.channel.name == "temperature"
        return True

    async def setup(self):
        self.threshold = self.config.high_temp_threshold.value
        self.site = self.config.site_name.value

    async def on_message_create(self, event: MessageCreateEvent):
        temp = event.message.data.get("value")
        if temp is None:
            return

        await self.set_tag("last_temperature", temp)

        if temp > self.threshold and not self.get_tag("alert_active"):
            await self.set_tag("alert_active", True)
            await self.send_notification(
                f"Temperature at {self.site} is {temp}C (threshold: {self.threshold}C)",
                title="High Temperature Alert",
                severity=NotificationSeverity.warning,
            )
        elif temp <= self.threshold and self.get_tag("alert_active"):
            await self.set_tag("alert_active", False)

    async def on_deployment(self, event: DeploymentEvent):
        # Seed initial tag values on first deployment
        await self.set_tag("alert_active", False)


# Lambda entry point
app = TemperatureMonitor()

def handler(event, context):
    return run_app(app, event, context)

Related Pages