Skip to content

Event Types

Each processor invocation is triggered by a specific event type. The framework parses the raw event payload into a typed event object and routes it to the corresponding handler method on your Application subclass.

This page covers all six event types, their fields, and practical handler patterns.

MessageCreateEvent

Fires when a new message is published to a channel that this processor is subscribed to. This is the most common event type.

Fields

FieldTypeDescription
channelChannelIDThe channel the message was published to. Has name, id, and owner_id attributes.
messageMessageThe message object. Has id, data, channel, created_at, and files attributes.

Handler

from pydoover.models import MessageCreateEvent

async def on_message_create(self, event: MessageCreateEvent):
    channel_name = event.channel.name
    data = event.message.data

    if channel_name == "sensor_readings":
        temperature = data.get("temperature")
        humidity = data.get("humidity")
        await self.process_readings(temperature, humidity)

The event.message.data dictionary contains the payload that was published to the channel. The event.channel.name tells you which channel the message arrived on, which is important when a processor subscribes to multiple channels.

Pre-Hook Filter Patterns

Filtering by channel name is the most common pre-hook pattern for message events.

async def pre_hook_filter(self, event):
    # Only process messages from specific channels
    if hasattr(event, "channel"):
        return event.channel.name in ("temperature", "humidity", "pressure")
    return True

You can also filter by checking specific fields in the message payload.

async def pre_hook_filter(self, event):
    if hasattr(event, "message"):
        # Skip messages that do not contain a "reading" key
        return "reading" in event.message.data
    return True

AggregateUpdateEvent

Fires when a channel's aggregate state is updated. Unlike MessageCreateEvent, which delivers individual messages, this event delivers the full merged state of the channel after the update.

Fields

FieldTypeDescription
author_idintThe ID of the entity that triggered the update
channelChannelIDThe channel whose aggregate was updated
aggregateAggregateThe full aggregate state after the update. Access the data via aggregate.data.
request_dataAggregateThe data that was sent in the update request (the delta). Access via request_data.data.
organisation_idintThe organisation that owns the channel

Handler

from pydoover.models import AggregateUpdateEvent

async def on_aggregate_update(self, event: AggregateUpdateEvent):
    # The full current state of the channel
    current_state = event.aggregate.data

    # What was actually changed in this update
    changed_data = event.request_data.data

    if "status" in changed_data:
        new_status = changed_data["status"]
        await self.handle_status_change(new_status)

The distinction between aggregate (full state) and request_data (delta) is important. Use request_data when you only care about what changed, and aggregate when you need the complete picture.

Information Circle

Both MessageCreateEvent and AggregateUpdateEvent set the anti-recursion guard on the ProcessorDataClient. If your handler tries to write back to the same channel that triggered the event, the client will raise a RuntimeError unless you explicitly pass allow_invoking_channel=True.

DeploymentEvent

Fires when the application is deployed or redeployed to an agent. This includes initial deployments and configuration updates that trigger a redeployment.

Fields

FieldTypeDescription
agent_idintThe agent the app was deployed to
app_idintThe application ID
app_install_idintThe specific installation ID
app_keystrThe application key
app_display_namestrThe human-readable application name

Handler

The framework automatically publishes the UI schema on deployment (for non-static UIs). Override on_deployment to perform additional one-off setup.

from pydoover.models import DeploymentEvent

async def on_deployment(self, event: DeploymentEvent):
    # Create channels this processor will use
    await self.api.create_channel("daily_reports", is_private=True)

    # Publish the UI schema (already done by the framework, but shown for clarity)
    # await self.publish_ui_schema()

    # Log the deployment
    await self.set_tag("last_deployed_at", "2026-01-15T10:00:00Z", log=True)
Information Circle

Unlike other event types, on_deployment always runs the framework's internal logic (UI schema publishing) even if you do not override the handler. This is the only event type where the framework does work regardless of whether a user handler exists.

ScheduleEvent

Fires when a cron or rate schedule triggers. Schedules are configured through the ScheduleConfig element in the processor's configuration schema.

Fields

FieldTypeDescription
schedule_idintThe ID of the schedule that fired

Handler

from pydoover.models import ScheduleEvent

async def on_schedule(self, event: ScheduleEvent):
    # Generate a periodic report
    readings = await self.api.list_messages(
        "sensor_readings",
        limit=100,
    )
    summary = self.calculate_summary(readings)
    await self.api.create_message("daily_summary", summary)

Schedule events arrive from EventBridge rather than SNS. The schedule frequency is defined in the processor's config schema using ScheduleConfig, which supports cron, rate, and disabled modes.

from pydoover.processor import ScheduleConfig
from pydoover.config import Schema

class Config(Schema):
    schedule = ScheduleConfig(
        "Report Schedule",
        description="When to generate the daily report",
    )

This adds a schedule configuration field that the operator can set through the Doover UI. The platform creates the corresponding EventBridge rule automatically.

IngestionEndpointEvent

Fires when an external HTTP request is received at the processor's ingestion endpoint. This is used to receive data from third-party systems that push data via HTTP — for example, external hardware monitors (weather stations, grain storage monitors, industrial gateways) that POST telemetry to a Doover endpoint rather than running a Doover device agent directly. The processor acts as a bridge, validating the payload and routing data to the appropriate device agent's channel.

Fields

FieldTypeDescription
ingestion_idintThe ingestion endpoint ID
agent_idintThe agent this endpoint belongs to
organisation_idintThe organisation that owns the agent
payloadAnyThe parsed request body (base64-decoded and JSON-parsed by default)

Handler

from pydoover.models import IngestionEndpointEvent

async def on_ingestion_endpoint(self, event: IngestionEndpointEvent):
    # The payload is automatically decoded from base64 and parsed as JSON
    device_id = event.payload.get("device_id")
    readings = event.payload.get("readings", [])

    for reading in readings:
        await self.api.create_message(
            f"external_{device_id}",
            reading,
        )

Custom Payload Parsing

By default, the framework decodes the payload from base64 and parses it as JSON. If your external system sends data in a different format, override parse_ingestion_event_payload.

import struct
import base64

def parse_ingestion_event_payload(self, payload: str):
    # Decode the base64 wrapper (always required)
    raw_bytes = base64.b64decode(payload)
    # Parse a C-packed struct instead of JSON
    temp, humidity, pressure = struct.unpack("<fff", raw_bytes)
    return {"temperature": temp, "humidity": humidity, "pressure": pressure}
Warning

The Doover platform always wraps the incoming HTTP body in base64 encoding. You must decode the base64 layer first, regardless of the underlying format. The default implementation handles this for you.

Information Circle
Example — Envelope Unwrapping

A processor overrides parse_ingestion_event_payload to unwrap JSON payloads that may arrive inside a {"data": ...} envelope:

import json
import base64

def parse_ingestion_event_payload(self, payload: str) -> dict:
    raw = base64.b64decode(payload)
    parsed = json.loads(raw)
    if isinstance(parsed, dict) and "data" in parsed and len(parsed) == 1:
        parsed = parsed["data"]
    return parsed

Ingestion Configuration

The ingestion endpoint is configured through IngestionEndpointConfig in the processor's schema.

from pydoover.processor import IngestionEndpointConfig
from pydoover.config import Schema

class Config(Schema):
    ingestion = IngestionEndpointConfig(
        "External Data Ingestion",
        description="Receives data from the field gateway",
    )

This creates an ingestion endpoint with configurable CIDR ranges, signing key, and throttle settings.

ManualInvokeEvent

Fires when the processor is triggered manually from the Doover UI or CLI. This is useful for ad-hoc operations like recalculations, data migrations, or diagnostic actions.

Fields

FieldTypeDescription
organisation_idintThe organisation context
payloaddictArbitrary JSON payload passed by the invoker

Handler

from pydoover.models import ManualInvokeEvent

async def on_manual_invoke(self, event: ManualInvokeEvent):
    action = event.payload.get("action")

    if action == "recalculate":
        await self.recalculate_aggregates()
    elif action == "export":
        await self.export_data(event.payload.get("format", "csv"))
    else:
        # Unknown action; log it
        pass

The payload is entirely user-defined. Establish a convention (such as an action key) to route between different manual operations.

Event Routing Summary

Event TypeHandler MethodTrigger SourceAnti-Recursion
MessageCreateEventon_message_createSNS (channel subscription)Yes
AggregateUpdateEventon_aggregate_updateSNS (channel subscription)Yes
DeploymentEventon_deploymentSNS (deployment)No
ScheduleEventon_scheduleEventBridgeNo
IngestionEndpointEventon_ingestion_endpointSNS (ingestion)No
ManualInvokeEventon_manual_invokeDirect invocationNo

Related Pages