Skip to content

Quick Start

This guide walks through three ways to use pydoover. Pick the path that matches your use case, or read all three for a complete picture.

1. Docker application (device-side)

Docker applications run on Doovit edge devices. They follow a setup / main_loop pattern: setup() runs once when the app starts, and main_loop() runs repeatedly on a configurable interval (default: 1 second).

The example below declares a tag to track a counter, a UI variable to display it on the dashboard, and a main loop that increments the counter each cycle.

from pydoover.docker import Application, run_app
from pydoover.tags import Number, Tags
from pydoover import ui


class MyTags(Tags):
    """Declare persistent state as typed tags."""
    counter = Number(default=0)


class MyUI(ui.UI):
    """Declare dashboard elements. Bind the variable to the tag so
    the displayed value updates automatically."""
    counter = ui.NumericVariable(
        "counter",
        "Loop Counter",
        value=MyTags.counter,
    )


class MyApp(Application):
    tags_cls = MyTags
    ui_cls = MyUI

    def setup(self):
        """Runs once on startup. Use this for one-time initialisation
        such as configuring hardware interfaces or registering callbacks."""
        pass

    def main_loop(self):
        """Runs repeatedly. Each invocation should be fast and non-blocking.
        Set loop_target_period on the app instance to control the interval."""
        self.tags.counter.increment()


if __name__ == "__main__":
    run_app(MyApp())

Key points:

  • Tags persist across restarts. Number, Boolean, and String are the built-in types. See Tags for details.
  • UI elements are declared on a UI subclass and bound to tags with value=MyTags.counter. The framework syncs the schema to the cloud automatically. See UI for the full element set.
  • run_app() parses command-line arguments injected by the Doovit runtime (app key, gRPC URIs, etc.), wires up the device agent, platform, and modbus interfaces, then starts the async event loop. You do not need to configure these manually.
Information Circle

Docker apps require the grpc extra. Install it with pip install pydoover[grpc]. See Installation for details.

For hardware I/O, Modbus, event subscriptions, and shutdown handling, see the Docker Applications section.

2. Cloud processor (serverless)

Cloud processors run as AWS Lambda functions triggered by Doover events. They react to channel messages, scheduled events, deployment changes, and ingestion endpoint payloads.

The example below handles a MessageCreateEvent --- the most common trigger. It reads the incoming message payload and creates a response on another channel.

from pydoover.processor import Application, run_app
from pydoover.models import MessageCreateEvent


class MyProcessor(Application):

    async def setup(self):
        """Runs before each invocation. Use this to load config,
        initialise state, or prepare resources."""
        pass

    async def on_message_create(self, event: MessageCreateEvent):
        """Called when a new message arrives on a subscribed channel.
        The event contains the channel, message payload, and metadata."""
        payload = event.message.payload
        channel_name = event.channel.name

        # Process the incoming data and write a result to another channel
        result = {"processed": True, "source": channel_name}
        await self.api.create_message(
            self.agent_id,
            "my_output_channel",
            payload=result,
        )


# Lambda handler entry point
def handler(event, context):
    run_app(MyProcessor(), event, context)

Key points:

  • Event handlers are async methods you override on Application. The framework dispatches to the right handler based on the event type. Available handlers include on_message_create, on_aggregate_update, on_deployment, on_schedule, on_ingestion_endpoint, and on_manual_invoke.
  • self.api is a ProcessorDataClient pre-authenticated with the processor's invocation context. Use it to read and write channels, messages, aggregates, and notifications without managing tokens yourself.
  • run_app(app, event, context) is the Lambda entry point. It unpacks the SNS or EventBridge payload and routes it to your handler.

Processors also support Tags, UI, and Configuration using the same declarative pattern as Docker apps.

For filtering, lifecycle hooks, and the full event type reference, see the Cloud Processors section.

3. API client (standalone scripts)

Use DataClient or AsyncDataClient to interact with the Doover Data API from any Python environment --- scripts, notebooks, CI pipelines, or backend services.

Synchronous client

The synchronous client uses httpx under the hood and works in regular (non-async) Python code.

from pydoover.api import DataClient

# Authenticate using a named profile from your Doover CLI config.
# Profiles are created with `doover auth login`.
client = DataClient(profile="default")

# Fetch a channel by agent ID and channel name
channel = client.fetch_channel(agent_id=12345, channel_name="sensor_data")
print(f"Channel: {channel.name}, ID: {channel.id}")

# List recent messages on that channel
messages = client.list_messages(agent_id=12345, channel_name="sensor_data", limit=10)
for msg in messages:
    print(msg.payload)

# Create a new message
client.create_message(
    agent_id=12345,
    channel_name="commands",
    payload={"action": "restart"},
)

client.close()

The profile parameter loads credentials from the Doover CLI config file (created by doover auth login). You can also authenticate with a raw token:

client = DataClient(token="your-api-token")

Asynchronous client

The async client uses aiohttp and is suitable for applications that already use asyncio.

import asyncio
from pydoover.api import AsyncDataClient


async def main():
    async with AsyncDataClient(profile="default") as client:
        channel = await client.fetch_channel(
            agent_id=12345,
            channel_name="sensor_data",
        )
        print(f"Channel: {channel.name}")

        messages = await client.list_messages(
            agent_id=12345,
            channel_name="sensor_data",
            limit=5,
        )
        for msg in messages:
            print(msg.payload)


asyncio.run(main())

The async client supports the same operations as the synchronous client. Use async with to ensure the underlying session is closed properly.

For the full API reference including channel CRUD, timeseries queries, aggregates, notifications, and alarm management, see the Cloud API section.