Skip to content

Core Concepts

This page covers the fundamental concepts you need to understand when building pydoover applications: the tag-based state system, async/sync abstraction patterns, and gRPC interfaces.

Tag-Based State System

Tags are the primary mechanism for sharing state between applications and with the cloud. They provide a simple key-value store that automatically synchronizes across your device and with the Doover platform.

Understanding Tags

Tags are organized in two categories:

  • App Tags - Scoped to a specific application, namespaced by app_key
  • Global Tags - Shared across all applications on the device

Getting Tag Values

Use get_tag() to retrieve app-scoped tags:

# Get a tag for the current app
value = self.get_tag("my_tag")

# Get a tag with a default value
value = self.get_tag("my_tag", default=0)

# Get a tag from another app
value = self.get_tag("other_tag", app_key="other-app-1234")

Use get_global_tag() for global tags:

# Get a global tag
system_status = self.get_global_tag("system_status")

# With a default value
is_enabled = self.get_global_tag("feature_enabled", default=False)

Setting Tag Values

Use set_tag() to update app-scoped tags:

# Set a tag for the current app
self.set_tag("temperature", 25.5)

# Set a tag for another app
self.set_tag("shared_value", 100, app_key="other-app-1234")

# Always set, even if value hasn't changed
self.set_tag("counter", count, only_if_changed=False)

Use set_global_tag() for global tags:

# Set a global tag
self.set_global_tag("system_status", "operational")

Tag Change Detection

By default, set_tag() only publishes when the value changes:

# These calls only publish once (value unchanged)
self.set_tag("status", "ready")
self.set_tag("status", "ready")  # Not published

# Force publish even if unchanged
self.set_tag("heartbeat", time.time(), only_if_changed=False)

Subscribing to Tag Changes

Use subscribe_to_tag() to receive callbacks when tags change:

def setup(self):
    # Subscribe to a tag from current app
    self.subscribe_to_tag("temperature", self.on_temperature_change)

    # Subscribe to a tag from another app
    self.subscribe_to_tag(
        "other_tag",
        self.on_other_change,
        app_key="other-app-1234"
    )

    # Subscribe to a global tag
    self.subscribe_to_tag(
        "system_command",
        self.on_command,
        global_tag=True
    )

def on_temperature_change(self, tag_key: str, new_value):
    print(f"Temperature changed to: {new_value}")

async def on_other_change(self, tag_key: str, new_value):
    # Callbacks can be async
    await self.process_change(new_value)

Tag Best Practices

  1. Use app tags by default - Only use global tags for truly device-wide state
  2. Choose descriptive names - Tag names should clearly indicate their purpose
  3. Keep values simple - Tags work best with primitive types and simple dictionaries
  4. Avoid high-frequency updates - Tags are synced to the cloud, so avoid setting them too frequently

Async/Sync Patterns

pydoover supports both synchronous and asynchronous programming patterns. The framework automatically adapts based on how you write your setup() and main_loop() methods.

Automatic Detection

The framework detects whether your application is async:

# Synchronous application
class SyncApp(Application):
    def setup(self):
        self.set_tag("ready", True)

    def main_loop(self):
        value = self.platform_iface.get_di(0)
        self.set_tag("input", value)

# Asynchronous application
class AsyncApp(Application):
    async def setup(self):
        await self.initialize()
        self.set_tag("ready", True)

    async def main_loop(self):
        value = await self.platform_iface.get_di(0)
        self.set_tag("input", value)

The maybe_async Decorator

Many pydoover methods use the @maybe_async() decorator to provide a unified interface:

from pydoover.utils import maybe_async

class MyClass:
    @maybe_async()
    def my_function(self, value: str):
        # Sync implementation
        return "sync result"

    async def my_function_async(self, value: str):
        # Async implementation
        return "async result"

When you call my_function():

  • In a sync context, the sync version runs directly
  • In an async context, the async version is called and you must await it
# In async context
result = await obj.my_function("test")  # Calls my_function_async

# In sync context
result = obj.my_function("test")  # Calls my_function directly

The call_maybe_async Helper

Use call_maybe_async() to invoke functions that may be sync or async:

from pydoover.utils import call_maybe_async

# Works with both sync and async functions
await call_maybe_async(callback, arg1, arg2)

# Run as a background task
task = await call_maybe_async(callback, arg1, as_task=True)

# Run sync function in executor (non-blocking)
await call_maybe_async(sync_function, arg1, in_executor=True)

# Run sync function directly (blocking)
await call_maybe_async(sync_function, arg1, in_executor=False)

Parameters:

ParameterTypeDefaultDescription
funcCallablerequiredThe function to call
*argsany-Arguments to pass to the function
as_taskboolFalseRun as an asyncio task
in_executorboolTrueRun sync functions in executor
**kwargsany-Keyword arguments (not supported with executor)

Choosing Sync vs Async

Use synchronous code when:

  • Your logic is simple and sequential
  • You don't need concurrent operations
  • You're more comfortable with sync patterns

Use asynchronous code when:

  • You need concurrent I/O operations
  • You're integrating with async libraries
  • You want maximum performance

Advanced Pattern: State Machines

For applications with complex operational states (startup sequences, error handling, multi-step processes), consider using the transitions library to manage state flows. This pattern provides:

  • Timeout-driven transitions - Auto-fail if an operation doesn't complete in time
  • Queued transitions - Handle rapid state changes without race conditions
  • Clear state diagrams - States and transitions map directly to code

See the State Machine Integration example for a complete implementation.

gRPC Interfaces

pydoover uses gRPC for communication between your application and system services. Understanding the interface architecture helps you work effectively with the platform.

GRPCInterface Base Class

All interfaces extend the GRPCInterface base class:

class GRPCInterface:
    def __init__(
        self,
        app_key: str,
        uri: str,
        is_async: bool = False,
        timeout: int = 7
    ):
        self.app_key = app_key
        self.uri = uri
        self._is_async = is_async
        self.timeout = timeout

DeviceAgentInterface

The Device Agent Interface manages cloud communication:

# Available on Application as self.device_agent

# Check connectivity
is_available = self.device_agent.get_is_dda_available()
is_online = self.device_agent.get_is_dda_online()
has_been_online = self.device_agent.get_has_dda_been_online()

# Channel operations
self.device_agent.add_subscription("channel", callback)
self.device_agent.publish_to_channel("channel", {"data": "value"})
aggregate = self.device_agent.get_channel_aggregate("channel")

# Wait for DDA
await self.device_agent.await_dda_available_async(timeout=30)

Key methods:

MethodDescription
add_subscription(channel, callback)Subscribe to channel updates
publish_to_channel(channel, data)Send data to a channel
get_channel_aggregate(channel)Get current channel state
get_is_dda_online()Check if connected to cloud
test_comms(message)Test connection with echo

PlatformInterface

The Platform Interface provides hardware I/O access:

# Available on Application as self.platform_iface

# Digital I/O
di_value = self.platform_iface.get_di(0)
self.platform_iface.set_do(0, True)
self.platform_iface.schedule_do(0, False, delay_secs=10)

# Analog I/O
ai_value = self.platform_iface.get_ai(0)
self.platform_iface.set_ao(0, 3.3)
self.platform_iface.schedule_ao(0, 0.0, delay_secs=5)

# System information
voltage = self.platform_iface.get_system_voltage()
power = self.platform_iface.get_system_power()
temp = self.platform_iface.get_system_temperature()
location = self.platform_iface.get_location()

# Pulse counting
counter = self.platform_iface.get_new_pulse_counter(
    di=0,
    edge="rising",
    callback=my_callback
)

Key methods:

MethodDescription
get_di(*pins)Read digital input(s)
set_do(pins, values)Set digital output(s)
get_ai(*pins)Read analog input(s)
set_ao(pins, values)Set analog output(s)
schedule_do/schedule_aoDelayed I/O changes
get_new_pulse_counter()Create pulse counter

ModbusInterface

The Modbus Interface handles industrial protocol communication:

# Available on Application as self.modbus_iface

# Read registers
values = self.modbus_iface.read_registers(
    bus_id="default",
    modbus_id=1,
    start_address=0,
    num_registers=10,
    register_type=4
)

# Write registers
success = self.modbus_iface.write_registers(
    bus_id="default",
    modbus_id=1,
    start_address=0,
    values=[100, 200, 300]
)

# Subscribe to register updates
self.modbus_iface.add_read_register_subscription(
    bus_id="default",
    modbus_id=1,
    start_address=0,
    num_registers=10,
    poll_secs=5,
    callback=my_callback
)

# Bus management
self.modbus_iface.open_bus(bus_type="serial", name="my_bus", ...)
self.modbus_iface.close_bus(bus_id="my_bus")
status = self.modbus_iface.get_bus_status(bus_id="my_bus")

Key methods:

MethodDescription
read_registers()Read Modbus registers
write_registers()Write to Modbus registers
add_read_register_subscription()Periodic polling with callback
open_bus()Open a Modbus bus connection
close_bus()Close a Modbus bus
get_bus_status()Check if bus is open

Application Convenience Methods

The Application class provides convenience wrappers for common interface operations:

# DDA status
self.get_is_dda_available()
self.get_is_dda_online()
self.get_has_dda_been_online()

# Channel operations
self.subscribe_to_channel(name, callback)
self.publish_to_channel(name, data)
self.get_channel_aggregate(name)

# Platform I/O
self.get_di(pin)
self.get_ai(pin)
self.get_do(pin)
self.set_do(pin, value)
self.get_ao(pin)
self.set_ao(pin, value)
self.schedule_do(pin, value, delay)
self.schedule_ao(pin, value, delay)

# Modbus
self.read_modbus_registers(address, count, register_type, modbus_id, bus_id)
self.write_modbus_registers(address, values, register_type, modbus_id, bus_id)
self.add_new_modbus_read_subscription(address, count, register_type, callback, ...)

Error Handling

All gRPC interfaces include built-in error handling:

  • Failed requests return None rather than raising exceptions
  • Errors are logged automatically
  • Response validation checks for success status
# Safe pattern - check for None
value = self.platform_iface.get_di(0)
if value is not None:
    self.process(value)
else:
    self.log.warning("Failed to read digital input")

Utility Functions

pydoover provides several utility functions:

on_change Decorator

Trigger callbacks when a function's return value changes:

from pydoover.utils import on_change

class MyApp(Application):
    def my_callback(self, new, old, is_first, name):
        print(f"{name}: {old} -> {new}")

    @on_change("my_callback", name="sensor_reading")
    def read_sensor(self):
        return self.platform_iface.get_ai(0)

Logging Setup

Configure logging with the built-in formatter:

from pydoover.utils import setup_logging, LogFormatter

# Basic setup
setup_logging(debug=True)

# Custom formatter
custom_formatter = LogFormatter()
setup_logging(debug=False, formatter=custom_formatter)

Dictionary Utilities

Work with nested dictionaries:

from pydoover.utils import find_object_with_key, find_path_to_key

data = {"a": {"b": {"c": "value"}}}

# Find a value
value = find_object_with_key(data, "c")  # Returns "value"

# Find the path
path = find_path_to_key(data, "c")  # Returns "a.b.c"

Diff Utilities

Generate and apply differences between dictionaries:

from pydoover.utils import generate_diff, apply_diff

old = {"a": 1, "b": 2}
new = {"a": 1, "b": 3, "c": 4}

# Generate diff
diff = generate_diff(old, new)  # {"b": 3, "c": 4}

# Apply diff
result = apply_diff(old, diff)  # {"a": 1, "b": 3, "c": 4}