Skip to content

Tag Management

Tag managers are the runtime engines behind the tag system. They handle reading from and writing to the underlying tag_values channel, buffering changes for efficient cloud communication, and dispatching subscription callbacks when tags change. The two manager implementations serve different execution contexts: Docker applications and cloud processors.

TagsManagerDocker

TagsManagerDocker is the tag manager for long-running Docker applications on Doovit devices. It maintains a local cache of all tag values, buffers outgoing changes, and periodically flushes logs to the cloud.

Buffering Strategy

The Docker tag manager uses a three-tier buffering approach to minimise cloud traffic while ensuring timely delivery of important changes.

Pending aggregate -- Every call to set_tag() accumulates changes in a pending aggregate buffer. At the end of each main loop iteration, commit_tags() flushes this buffer to the Device Agent as a channel aggregate update. This means tag values are always current in the cloud within one loop period.

Periodic log -- In addition to updating the aggregate, the manager accumulates a separate periodic log buffer. Every 15 minutes (configurable via tag_log_interval), this buffer is flushed as a channel message. These messages create historical data points that appear in graphs and reports.

Immediate log -- When a tag update triggers an auto-logging rule (see Tag Types and Auto-Logging) or when set(value, log=True) is called explicitly, the change is promoted to an immediate log buffer. This buffer is flushed at the end of the current loop iteration via commit_tags(), bypassing the 15-minute periodic interval. When a value is promoted to immediate log, it is removed from the periodic log buffer to avoid duplicate logging.

# Normal set -- buffered in pending aggregate + periodic log
await self.tags.temperature.set(22.5)

# Immediate log -- buffered in pending aggregate + immediate log
await self.tags.temperature.set(22.5, log=True)

Tag Aging

The manager adjusts the max_age of aggregate updates based on whether a user is actively viewing the device in the web portal.

StateMax AgeBehaviour
Observed3 secondsA user has the device page open. Tags update rapidly.
Unobserved15 minutesNo active viewers. Tags update at the periodic interval.

Observation is detected via a doover_ui_fastmode channel. When a user opens the device page, the portal sends heartbeat pings to this channel. The manager checks for recent heartbeats (within the last 2 minutes) to determine observation state.

commit_tags()

commit_tags() is the primary flush method, called automatically at the end of each main loop iteration by the application framework. It performs three steps in order:

  1. Flush aggregate -- Publishes any dirty pending aggregate changes to the Device Agent
  2. Flush immediate logs -- If any tags were marked for immediate logging, creates a channel message with those values
  3. Flush periodic logs -- If the periodic log interval has elapsed (default 15 minutes) and there are buffered log values, creates a channel message with those values

You rarely need to call commit_tags() yourself. The framework handles it. However, if you need to force a flush mid-loop (e.g., before a long blocking operation), you can call it explicitly:

await self.tags.temperature.set(critical_value, log=True)
await self.tag_manager.commit_tags()  # Force immediate flush

Tag Subscriptions

Docker tag managers support subscribing to tag changes with callback functions. When a tag value changes (as reported by the Device Agent's aggregate update events), registered callbacks are invoked.

# Subscribe to changes on a specific tag
async def on_temperature_change(key, value):
    print(f"Temperature changed to {value}")

self.tag_manager.subscribe_to_tag(
    "temperature",
    on_temperature_change,
    app_key="my_app",
)

Callbacks can be either synchronous or asynchronous functions. They receive two arguments: the tag key and the new value. Callbacks have a 1-second timeout; if a callback takes longer, it is cancelled and an error is logged.

To remove a subscription:

self.tag_manager.unsubscribe_from_tag("temperature", app_key="my_app")
Information Circle

Subscription callbacks fire on changes detected from the Device Agent's aggregate update events, not from local set_tag() calls in the same application. If you set a tag and subscribe to it in the same app, the callback fires when the round-trip through the Device Agent completes.

TagsManagerProcessor

TagsManagerProcessor is the tag manager for cloud processor execution contexts. Unlike Docker apps, processors are short-lived invocations triggered by channel events. The processor manager operates on an in-memory snapshot of tag values.

How It Works

When a processor is invoked, it receives the current tag values as part of its execution context. The TagsManagerProcessor wraps this snapshot and tracks any changes made during the invocation.

# Inside a processor, tags work the same way
current_temp = self.tags.temperature.get()
await self.tags.status.set("processed")

All changes are buffered in memory. At the end of the processor invocation, commit_tags() writes the updated tag values back to the cloud in a single operation. If record_tag_update is True (the default), a channel message is also created for historical logging.

Differences from Docker Manager

FeatureDockerProcessor
BufferingThree-tier (aggregate, periodic, immediate)Single in-memory buffer
SubscriptionsSupportedNot supported
Flush timingEnd of each loop iterationEnd of invocation
Tag agingObservation-aware (3s / 15min)Not applicable
Periodic loggingEvery 15 minutesSingle commit at end

Cross-App Tag Access

Both tag managers support reading and writing tags from other applications by specifying an app_key parameter on get_tag() and set_tag().

# Read a tag from another app
other_temp = self.tag_manager.get_tag(
    "temperature",
    app_key="weather_station",
    default=0.0,
)

# Write a tag to another app's namespace
await self.tag_manager.set_tag(
    "alert_status",
    "high_temp",
    app_key="alert_manager",
)

For declarative cross-app access, prefer RemoteTag which handles resolution through the config schema and supports automatic republishing.

Tag Subscriptions Across Apps

When subscribing to tags from another application, pass the app_key parameter to scope the subscription correctly.

# Subscribe to a tag from a different application
self.tag_manager.subscribe_to_tag(
    "flow_rate",
    self.on_flow_change,
    app_key="flow_meter_app",
)

Remote Tag Resolution

When a Tags subclass contains RemoteTag declarations, the framework calls _resolve_remote_tags() during setup. This method:

  1. Scans the config schema for all TagRef elements and indexes them by reference_name
  2. Matches each declared RemoteTag to its corresponding TagRef
  3. Records the resolved target (app_key, tag_name) for runtime routing
  4. For tags with republish_locally=True, installs a subscription that mirrors upstream values into the local app's namespace and seeds the local mirror with the current upstream value

If a required RemoteTag has no matching TagRef in the config, a ValueError is raised during setup. Optional remote tags (marked with optional=True) are silently skipped when their TagRef is missing or blank.

KeyPath

Tag keys can represent hierarchical paths for nested tag structures. The KeyPath utility normalises these paths and provides methods for navigating nested dictionaries.

from pydoover.state import KeyPath

# Simple key
path = KeyPath("temperature")

# Key scoped to an app
path = KeyPath("temperature", app_key="my_app")

# Look up a value in a nested dict
value = path.lookup_dict(tag_values)

# Check if the path exists
exists = path.in_dict(tag_values)

KeyPath is used internally by the tag managers and is generally not needed in application code. It becomes relevant when working with the lower-level set_tags() method that accepts nested dictionaries.

Next Steps