Core Concepts
This page covers the fundamental concepts you need to understand when building pydoover applications: the tag-based state system, async/sync abstraction patterns, and gRPC interfaces.
Tag-Based State System
Tags are the primary mechanism for sharing state between applications and with the cloud. They provide a simple key-value store that automatically synchronizes across your device and with the Doover platform.
Understanding Tags
Tags are organized in two categories:
- App Tags - Scoped to a specific application, namespaced by
app_key - Global Tags - Shared across all applications on the device
Getting Tag Values
Use get_tag() to retrieve app-scoped tags:
# Get a tag for the current app
value = self.get_tag("my_tag")
# Get a tag with a default value
value = self.get_tag("my_tag", default=0)
# Get a tag from another app
value = self.get_tag("other_tag", app_key="other-app-1234")
Use get_global_tag() for global tags:
# Get a global tag
system_status = self.get_global_tag("system_status")
# With a default value
is_enabled = self.get_global_tag("feature_enabled", default=False)
Setting Tag Values
Use set_tag() to update app-scoped tags:
# Set a tag for the current app
self.set_tag("temperature", 25.5)
# Set a tag for another app
self.set_tag("shared_value", 100, app_key="other-app-1234")
# Always set, even if value hasn't changed
self.set_tag("counter", count, only_if_changed=False)
Use set_global_tag() for global tags:
# Set a global tag
self.set_global_tag("system_status", "operational")
Tag Change Detection
By default, set_tag() only publishes when the value changes:
# These calls only publish once (value unchanged)
self.set_tag("status", "ready")
self.set_tag("status", "ready") # Not published
# Force publish even if unchanged
self.set_tag("heartbeat", time.time(), only_if_changed=False)
Subscribing to Tag Changes
Use subscribe_to_tag() to receive callbacks when tags change:
def setup(self):
# Subscribe to a tag from current app
self.subscribe_to_tag("temperature", self.on_temperature_change)
# Subscribe to a tag from another app
self.subscribe_to_tag(
"other_tag",
self.on_other_change,
app_key="other-app-1234"
)
# Subscribe to a global tag
self.subscribe_to_tag(
"system_command",
self.on_command,
global_tag=True
)
def on_temperature_change(self, tag_key: str, new_value):
print(f"Temperature changed to: {new_value}")
async def on_other_change(self, tag_key: str, new_value):
# Callbacks can be async
await self.process_change(new_value)
Tag Best Practices
- Use app tags by default - Only use global tags for truly device-wide state
- Choose descriptive names - Tag names should clearly indicate their purpose
- Keep values simple - Tags work best with primitive types and simple dictionaries
- Avoid high-frequency updates - Tags are synced to the cloud, so avoid setting them too frequently
Async/Sync Patterns
pydoover supports both synchronous and asynchronous programming patterns. The framework automatically adapts based on how you write your setup() and main_loop() methods.
Automatic Detection
The framework detects whether your application is async:
# Synchronous application
class SyncApp(Application):
def setup(self):
self.set_tag("ready", True)
def main_loop(self):
value = self.platform_iface.get_di(0)
self.set_tag("input", value)
# Asynchronous application
class AsyncApp(Application):
async def setup(self):
await self.initialize()
self.set_tag("ready", True)
async def main_loop(self):
value = await self.platform_iface.get_di(0)
self.set_tag("input", value)
The maybe_async Decorator
Many pydoover methods use the @maybe_async() decorator to provide a unified interface:
from pydoover.utils import maybe_async
class MyClass:
@maybe_async()
def my_function(self, value: str):
# Sync implementation
return "sync result"
async def my_function_async(self, value: str):
# Async implementation
return "async result"
When you call my_function():
- In a sync context, the sync version runs directly
- In an async context, the async version is called and you must
awaitit
# In async context
result = await obj.my_function("test") # Calls my_function_async
# In sync context
result = obj.my_function("test") # Calls my_function directly
The call_maybe_async Helper
Use call_maybe_async() to invoke functions that may be sync or async:
from pydoover.utils import call_maybe_async # Works with both sync and async functions await call_maybe_async(callback, arg1, arg2) # Run as a background task task = await call_maybe_async(callback, arg1, as_task=True) # Run sync function in executor (non-blocking) await call_maybe_async(sync_function, arg1, in_executor=True) # Run sync function directly (blocking) await call_maybe_async(sync_function, arg1, in_executor=False)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
func | Callable | required | The function to call |
*args | any | - | Arguments to pass to the function |
as_task | bool | False | Run as an asyncio task |
in_executor | bool | True | Run sync functions in executor |
**kwargs | any | - | Keyword arguments (not supported with executor) |
Choosing Sync vs Async
Use synchronous code when:
- Your logic is simple and sequential
- You don't need concurrent operations
- You're more comfortable with sync patterns
Use asynchronous code when:
- You need concurrent I/O operations
- You're integrating with async libraries
- You want maximum performance
Advanced Pattern: State Machines
For applications with complex operational states (startup sequences, error handling, multi-step processes), consider using the transitions library to manage state flows. This pattern provides:
- Timeout-driven transitions - Auto-fail if an operation doesn't complete in time
- Queued transitions - Handle rapid state changes without race conditions
- Clear state diagrams - States and transitions map directly to code
See the State Machine Integration example for a complete implementation.
gRPC Interfaces
pydoover uses gRPC for communication between your application and system services. Understanding the interface architecture helps you work effectively with the platform.
GRPCInterface Base Class
All interfaces extend the GRPCInterface base class:
class GRPCInterface:
def __init__(
self,
app_key: str,
uri: str,
is_async: bool = False,
timeout: int = 7
):
self.app_key = app_key
self.uri = uri
self._is_async = is_async
self.timeout = timeout
DeviceAgentInterface
The Device Agent Interface manages cloud communication:
# Available on Application as self.device_agent
# Check connectivity
is_available = self.device_agent.get_is_dda_available()
is_online = self.device_agent.get_is_dda_online()
has_been_online = self.device_agent.get_has_dda_been_online()
# Channel operations
self.device_agent.add_subscription("channel", callback)
self.device_agent.publish_to_channel("channel", {"data": "value"})
aggregate = self.device_agent.get_channel_aggregate("channel")
# Wait for DDA
await self.device_agent.await_dda_available_async(timeout=30)
Key methods:
| Method | Description |
|---|---|
add_subscription(channel, callback) | Subscribe to channel updates |
publish_to_channel(channel, data) | Send data to a channel |
get_channel_aggregate(channel) | Get current channel state |
get_is_dda_online() | Check if connected to cloud |
test_comms(message) | Test connection with echo |
PlatformInterface
The Platform Interface provides hardware I/O access:
# Available on Application as self.platform_iface
# Digital I/O
di_value = self.platform_iface.get_di(0)
self.platform_iface.set_do(0, True)
self.platform_iface.schedule_do(0, False, delay_secs=10)
# Analog I/O
ai_value = self.platform_iface.get_ai(0)
self.platform_iface.set_ao(0, 3.3)
self.platform_iface.schedule_ao(0, 0.0, delay_secs=5)
# System information
voltage = self.platform_iface.get_system_voltage()
power = self.platform_iface.get_system_power()
temp = self.platform_iface.get_system_temperature()
location = self.platform_iface.get_location()
# Pulse counting
counter = self.platform_iface.get_new_pulse_counter(
di=0,
edge="rising",
callback=my_callback
)
Key methods:
| Method | Description |
|---|---|
get_di(*pins) | Read digital input(s) |
set_do(pins, values) | Set digital output(s) |
get_ai(*pins) | Read analog input(s) |
set_ao(pins, values) | Set analog output(s) |
schedule_do/schedule_ao | Delayed I/O changes |
get_new_pulse_counter() | Create pulse counter |
ModbusInterface
The Modbus Interface handles industrial protocol communication:
# Available on Application as self.modbus_iface
# Read registers
values = self.modbus_iface.read_registers(
bus_id="default",
modbus_id=1,
start_address=0,
num_registers=10,
register_type=4
)
# Write registers
success = self.modbus_iface.write_registers(
bus_id="default",
modbus_id=1,
start_address=0,
values=[100, 200, 300]
)
# Subscribe to register updates
self.modbus_iface.add_read_register_subscription(
bus_id="default",
modbus_id=1,
start_address=0,
num_registers=10,
poll_secs=5,
callback=my_callback
)
# Bus management
self.modbus_iface.open_bus(bus_type="serial", name="my_bus", ...)
self.modbus_iface.close_bus(bus_id="my_bus")
status = self.modbus_iface.get_bus_status(bus_id="my_bus")
Key methods:
| Method | Description |
|---|---|
read_registers() | Read Modbus registers |
write_registers() | Write to Modbus registers |
add_read_register_subscription() | Periodic polling with callback |
open_bus() | Open a Modbus bus connection |
close_bus() | Close a Modbus bus |
get_bus_status() | Check if bus is open |
Application Convenience Methods
The Application class provides convenience wrappers for common interface operations:
# DDA status self.get_is_dda_available() self.get_is_dda_online() self.get_has_dda_been_online() # Channel operations self.subscribe_to_channel(name, callback) self.publish_to_channel(name, data) self.get_channel_aggregate(name) # Platform I/O self.get_di(pin) self.get_ai(pin) self.get_do(pin) self.set_do(pin, value) self.get_ao(pin) self.set_ao(pin, value) self.schedule_do(pin, value, delay) self.schedule_ao(pin, value, delay) # Modbus self.read_modbus_registers(address, count, register_type, modbus_id, bus_id) self.write_modbus_registers(address, values, register_type, modbus_id, bus_id) self.add_new_modbus_read_subscription(address, count, register_type, callback, ...)
Error Handling
All gRPC interfaces include built-in error handling:
- Failed requests return
Nonerather than raising exceptions - Errors are logged automatically
- Response validation checks for success status
# Safe pattern - check for None
value = self.platform_iface.get_di(0)
if value is not None:
self.process(value)
else:
self.log.warning("Failed to read digital input")
Utility Functions
pydoover provides several utility functions:
on_change Decorator
Trigger callbacks when a function's return value changes:
from pydoover.utils import on_change
class MyApp(Application):
def my_callback(self, new, old, is_first, name):
print(f"{name}: {old} -> {new}")
@on_change("my_callback", name="sensor_reading")
def read_sensor(self):
return self.platform_iface.get_ai(0)
Logging Setup
Configure logging with the built-in formatter:
from pydoover.utils import setup_logging, LogFormatter # Basic setup setup_logging(debug=True) # Custom formatter custom_formatter = LogFormatter() setup_logging(debug=False, formatter=custom_formatter)
Dictionary Utilities
Work with nested dictionaries:
from pydoover.utils import find_object_with_key, find_path_to_key
data = {"a": {"b": {"c": "value"}}}
# Find a value
value = find_object_with_key(data, "c") # Returns "value"
# Find the path
path = find_path_to_key(data, "c") # Returns "a.b.c"
Diff Utilities
Generate and apply differences between dictionaries:
from pydoover.utils import generate_diff, apply_diff
old = {"a": 1, "b": 2}
new = {"a": 1, "b": 3, "c": 4}
# Generate diff
diff = generate_diff(old, new) # {"b": 3, "c": 4}
# Apply diff
result = apply_diff(old, diff) # {"a": 1, "b": 3, "c": 4}