Platform Interface
The PlatformInterface provides access to the physical hardware on a Doovit device. It connects to the Platform Service via gRPC and exposes methods for digital and analog I/O, pulse counters, system monitoring, and device control.
In a Docker application, the Platform Interface is available as self.platform_iface. The Application class also provides proxy methods (e.g., self.fetch_di(), self.set_do()) that forward to this interface for convenience.
Import
from pydoover.docker import PlatformInterface
Constructor
plt = PlatformInterface(
app_key="my_app",
plt_uri="localhost:50053",
)
| Parameter | Type | Default | Description |
|---|---|---|---|
app_key | str | (required) | Application identifier. |
plt_uri | str | "localhost:50053" | gRPC address of the Platform Service. |
The Platform Service runs as a sidecar container on the Doovit. The default address assumes standard Doovit networking.
Digital Inputs
Digital inputs read the state of binary sensors, switches, or other on/off signals connected to the Doovit's DI pins.
fetch_di(*pins)
Read one or more digital input pins. Returns a single bool for one pin, or a list[bool] for multiple pins.
# Read a single digital input door_open = self.platform_iface.fetch_di(0) # Returns: True or False # Read multiple digital inputs at once di0, di1, di2 = self.platform_iface.fetch_di(0, 1, 2) # Returns: [True, False, True] (unpacked into three variables)
Reading a single pin returns a boolean value directly. Reading multiple pins returns a list of booleans in the same order as the pin arguments. Multi-pin reads are performed in a single gRPC call for efficiency.
fetch_di_events(di_pin, edge, include_system_events, events_from)
Fetch a history of digital input events (transitions) for a specific pin. This is useful for detecting events that occurred while the application was offline or between loop iterations.
synced, events = self.platform_iface.fetch_di_events(
di_pin=0,
edge="rising",
include_system_events=False,
events_from=last_check_time,
)
| Parameter | Type | Description |
|---|---|---|
di_pin | int | The digital input pin number. |
edge | str | Edge type to filter: "rising", "falling", or "both". |
include_system_events | bool | Whether to include system-generated events (e.g., boot). |
events_from | datetime or None | Only return events after this timestamp. |
Returns a tuple of (synced, events) where synced is a boolean indicating whether the event history is complete, and events is a list of event records with timestamps and edge types.
Digital Outputs
Digital outputs control relays, switches, LEDs, or other binary actuators connected to the Doovit's DO pins.
fetch_do(*pins)
Read the current state of one or more digital output pins.
relay_state = self.platform_iface.fetch_do(0) # Returns: True (relay is on) or False (relay is off)
This reads the output state as set by the application, not the physical pin level. It confirms what value the output is currently commanded to.
set_do(pin, value)
Set a digital output to a specific state.
# Turn on relay on DO pin 0 self.platform_iface.set_do(0, True) # Turn off relay on DO pin 1 self.platform_iface.set_do(1, False)
The change takes effect immediately. The Platform Service drives the physical output pin to the requested state.
schedule_do(pin, value, in_secs)
Schedule a digital output change to occur after a delay.
# Turn off the relay after 30 seconds self.platform_iface.schedule_do(0, False, in_secs=30)
| Parameter | Type | Description |
|---|---|---|
pin | int | The digital output pin number. |
value | bool | The state to set (True for on, False for off). |
in_secs | float | Seconds from now until the change occurs. |
Scheduled changes are managed by the Platform Service, which means they execute even if your application is busy or restarting. This is useful for timed operations like pulse outputs or delayed shutoffs.
Analog Inputs
Analog inputs read voltage or current levels from sensors connected to the Doovit's AI pins.
fetch_ai(*pins)
Read one or more analog input pins. Returns a single float for one pin, or a list[float] for multiple pins.
# Read a single analog input (e.g., a 4-20mA temperature sensor) temperature_raw = self.platform_iface.fetch_ai(0) # Returns: e.g., 12.5 (milliamps or volts, depending on configuration) # Read multiple analog inputs ai0, ai1 = self.platform_iface.fetch_ai(0, 1)
The raw value represents the electrical signal on the pin. Your application is responsible for converting this to engineering units (e.g., converting a 4-20mA signal to a temperature in degrees).
Analog Outputs
Analog outputs drive variable voltage or current signals to actuators, valves, or other proportional control devices.
fetch_ao(*pins)
Read the current commanded value of one or more analog output pins.
current_output = self.platform_iface.fetch_ao(0)
set_ao(pin, value)
Set an analog output to a specific value.
# Set analog output to 50% (e.g., half-open valve) self.platform_iface.set_ao(0, 50.0)
schedule_ao(pin, value, in_secs)
Schedule an analog output change to occur after a delay.
# Ramp down to zero after 60 seconds self.platform_iface.schedule_ao(0, 0.0, in_secs=60)
The scheduling behaviour is identical to schedule_do(): the Platform Service handles the timer, so the change occurs reliably even if your application restarts.
System Information
These methods read system-level telemetry from the Doovit hardware.
fetch_system_voltage()
Read the system input voltage.
voltage = self.platform_iface.fetch_system_voltage()
print(f"System voltage: {voltage}V")
Useful for monitoring battery levels or power supply health on solar-powered or battery-backed installations.
fetch_system_power()
Read the system power consumption.
power = self.platform_iface.fetch_system_power()
print(f"Power draw: {power}W")
fetch_system_temperature()
Read the internal board temperature.
board_temp = self.platform_iface.fetch_system_temperature()
if board_temp > 70.0:
self.send_notification(
message=f"Board temperature high: {board_temp}C",
severity="warning",
topic="system_health",
)
This reads the temperature of the Doovit's compute module, not an external sensor. Use it for thermal management and health monitoring.
fetch_location()
Read the GPS location from the Doovit's GNSS receiver. Returns a Location object.
location = self.platform_iface.fetch_location()
print(f"Lat: {location.latitude}, Lon: {location.longitude}")
The Location object contains latitude, longitude, and additional fix metadata. GPS availability depends on the Doovit's antenna and signal conditions.
fetch_immunity_seconds()
Read the remaining immunity time. Immunity prevents the device from shutting down during critical operations.
remaining = self.platform_iface.fetch_immunity_seconds()
set_immunity_seconds(secs)
Set the immunity period. While immunity is active, the Doovit will not honour shutdown requests.
# Prevent shutdown for the next 5 minutes during a firmware update self.platform_iface.set_immunity_seconds(300)
Setting a long immunity period prevents the device from shutting down gracefully during that time. Only use this for truly critical operations (firmware updates, data commits) where an interrupted shutdown could cause data loss.
Pulse Counters
Pulse counters track the number of transitions (edges) on a digital input pin. They are designed for flow meters, encoders, tachometers, and other pulse-generating sensors.
get_new_pulse_counter(di, edge, callback, rate_window_secs, auto_start)
Create a new pulse counter on a digital input pin.
# Count rising edges on DI pin 2 (e.g., a flow meter)
flow_counter = self.platform_iface.get_new_pulse_counter(
di=2,
edge="rising",
callback=self.on_flow_pulse,
rate_window_secs=60,
auto_start=True,
)
| Parameter | Type | Description |
|---|---|---|
di | int | Digital input pin number to monitor. |
edge | str | Edge type to count: "rising", "falling", or "both". |
callback | callable or None | Optional function called on each pulse event. |
rate_window_secs | int | Time window in seconds for rate calculation. |
auto_start | bool | Whether to start counting immediately. |
Returns a PulseCounter object for querying count and rate data.
PulseCounter Object
The PulseCounter returned by get_new_pulse_counter() provides the following:
| Attribute/Method | Type | Description |
|---|---|---|
pin | int | The digital input pin being monitored. |
edge | str | The edge type being counted. |
count | int | Total number of pulses counted since the counter was started. |
pulse_timestamps | list | Timestamps of recent pulses within the rate window. |
start_listener_pulses() | method | Start counting (if auto_start was False). |
get_pulses_per_minute() | float | Current pulse rate in pulses per minute, calculated from the rate window. |
get_pulses_in_window() | int | Number of pulses in the current rate window. |
# In the main loop, read the flow rate
def main_loop(self):
flow_rate = self.flow_counter.get_pulses_per_minute()
total_pulses = self.flow_counter.count
self.set_tag("flow_rate", flow_rate)
self.set_tag("total_flow_pulses", total_pulses)
# Convert pulses to litres (sensor-specific calibration)
litres_per_pulse = 0.5
total_litres = total_pulses * litres_per_pulse
self.set_tag("total_litres", total_litres)
This example uses the pulse counter's rate and total count to calculate a flow rate and total volume. The rate_window_secs parameter controls the time window over which the rate is averaged, smoothing out short-term variations.
get_new_event_counter(di, edge, callback, rate_window_secs, auto_start)
Create an event counter. This works similarly to a pulse counter but is designed for counting discrete events rather than continuous pulse trains. The API and parameters are identical to get_new_pulse_counter().
Device Control
These methods control the Doovit device itself.
reboot()
Reboot the Doovit device.
self.platform_iface.reboot()
Rebooting will restart all containers, including your application. Ensure any critical state is committed to tags or the cloud before calling reboot.
shutdown()
Shut down the Doovit device immediately.
self.platform_iface.shutdown()
schedule_shutdown(time_secs)
Schedule a device shutdown after a delay.
# Shut down in 5 minutes self.platform_iface.schedule_shutdown(time_secs=300)
The Platform Service manages the timer. The shutdown proceeds even if your application exits before the timer fires.
sync_rtc()
Synchronise the Doovit's real-time clock with the system time.
self.platform_iface.sync_rtc()
This is typically called after a GPS fix to ensure the RTC is accurate. The RTC maintains time during power loss, so periodic synchronisation improves timestamp accuracy.
Practical Examples
Reading Sensors and Controlling a Relay
This example reads a temperature sensor on analog input 0 and controls a cooling fan relay on digital output 0 based on a configurable threshold.
def setup(self):
self.loop_target_period = 5 # Check every 5 seconds
def main_loop(self):
temp = self.platform_iface.fetch_ai(0)
# Convert raw value to degrees (sensor-specific)
temp_c = (temp - 4.0) / 16.0 * 100.0 # 4-20mA to 0-100C
self.set_tag("temperature", temp_c)
# Control fan based on threshold with hysteresis
fan_on = self.platform_iface.fetch_do(0)
if temp_c > self.config.fan_on_temp and not fan_on:
self.platform_iface.set_do(0, True)
elif temp_c < self.config.fan_off_temp and fan_on:
self.platform_iface.set_do(0, False)
Monitoring a Flow Meter
This example uses a pulse counter to measure flow from a sensor on digital input 2 and publishes periodic telemetry.
def setup(self):
self.flow_counter = self.platform_iface.get_new_pulse_counter(
di=2,
edge="rising",
rate_window_secs=60,
auto_start=True,
)
self.loop_target_period = 10
def main_loop(self):
rate = self.flow_counter.get_pulses_per_minute()
total = self.flow_counter.count
self.set_tag("flow_rate_ppm", rate)
self.set_tag("total_pulses", total)
self.create_message("flow_telemetry", {
"flow_rate": rate,
"total_pulses": total,
})
System Health Monitoring
This example reads system telemetry and sends alerts when values are outside normal ranges.
def main_loop(self):
voltage = self.platform_iface.fetch_system_voltage()
board_temp = self.platform_iface.fetch_system_temperature()
location = self.platform_iface.fetch_location()
self.create_message("system_health", {
"voltage": voltage,
"board_temp": board_temp,
"latitude": location.latitude,
"longitude": location.longitude,
})
if voltage < 11.0:
self.send_notification(
message=f"Low system voltage: {voltage}V",
severity="critical",
topic="power",
)
Next Steps
- Application Class -- the Application base class with platform proxies
- Modbus Interface -- Modbus communication
- Device Agent Interface -- cloud communication
- Examples -- complete example applications