Skip to content

Platform Interface

The PlatformInterface provides access to the physical hardware on a Doovit device. It connects to the Platform Service via gRPC and exposes methods for digital and analog I/O, pulse counters, system monitoring, and device control.

In a Docker application, the Platform Interface is available as self.platform_iface. The Application class also provides proxy methods (e.g., self.fetch_di(), self.set_do()) that forward to this interface for convenience.

Import

from pydoover.docker import PlatformInterface

Constructor

plt = PlatformInterface(
    app_key="my_app",
    plt_uri="localhost:50053",
)
ParameterTypeDefaultDescription
app_keystr(required)Application identifier.
plt_uristr"localhost:50053"gRPC address of the Platform Service.

The Platform Service runs as a sidecar container on the Doovit. The default address assumes standard Doovit networking.

Digital Inputs

Digital inputs read the state of binary sensors, switches, or other on/off signals connected to the Doovit's DI pins.

fetch_di(*pins)

Read one or more digital input pins. Returns a single bool for one pin, or a list[bool] for multiple pins.

# Read a single digital input
door_open = self.platform_iface.fetch_di(0)
# Returns: True or False

# Read multiple digital inputs at once
di0, di1, di2 = self.platform_iface.fetch_di(0, 1, 2)
# Returns: [True, False, True] (unpacked into three variables)

Reading a single pin returns a boolean value directly. Reading multiple pins returns a list of booleans in the same order as the pin arguments. Multi-pin reads are performed in a single gRPC call for efficiency.

fetch_di_events(di_pin, edge, include_system_events, events_from)

Fetch a history of digital input events (transitions) for a specific pin. This is useful for detecting events that occurred while the application was offline or between loop iterations.

synced, events = self.platform_iface.fetch_di_events(
    di_pin=0,
    edge="rising",
    include_system_events=False,
    events_from=last_check_time,
)
ParameterTypeDescription
di_pinintThe digital input pin number.
edgestrEdge type to filter: "rising", "falling", or "both".
include_system_eventsboolWhether to include system-generated events (e.g., boot).
events_fromdatetime or NoneOnly return events after this timestamp.

Returns a tuple of (synced, events) where synced is a boolean indicating whether the event history is complete, and events is a list of event records with timestamps and edge types.

Digital Outputs

Digital outputs control relays, switches, LEDs, or other binary actuators connected to the Doovit's DO pins.

fetch_do(*pins)

Read the current state of one or more digital output pins.

relay_state = self.platform_iface.fetch_do(0)
# Returns: True (relay is on) or False (relay is off)

This reads the output state as set by the application, not the physical pin level. It confirms what value the output is currently commanded to.

set_do(pin, value)

Set a digital output to a specific state.

# Turn on relay on DO pin 0
self.platform_iface.set_do(0, True)

# Turn off relay on DO pin 1
self.platform_iface.set_do(1, False)

The change takes effect immediately. The Platform Service drives the physical output pin to the requested state.

schedule_do(pin, value, in_secs)

Schedule a digital output change to occur after a delay.

# Turn off the relay after 30 seconds
self.platform_iface.schedule_do(0, False, in_secs=30)
ParameterTypeDescription
pinintThe digital output pin number.
valueboolThe state to set (True for on, False for off).
in_secsfloatSeconds from now until the change occurs.

Scheduled changes are managed by the Platform Service, which means they execute even if your application is busy or restarting. This is useful for timed operations like pulse outputs or delayed shutoffs.

Analog Inputs

Analog inputs read voltage or current levels from sensors connected to the Doovit's AI pins.

fetch_ai(*pins)

Read one or more analog input pins. Returns a single float for one pin, or a list[float] for multiple pins.

# Read a single analog input (e.g., a 4-20mA temperature sensor)
temperature_raw = self.platform_iface.fetch_ai(0)
# Returns: e.g., 12.5 (milliamps or volts, depending on configuration)

# Read multiple analog inputs
ai0, ai1 = self.platform_iface.fetch_ai(0, 1)

The raw value represents the electrical signal on the pin. Your application is responsible for converting this to engineering units (e.g., converting a 4-20mA signal to a temperature in degrees).

Analog Outputs

Analog outputs drive variable voltage or current signals to actuators, valves, or other proportional control devices.

fetch_ao(*pins)

Read the current commanded value of one or more analog output pins.

current_output = self.platform_iface.fetch_ao(0)

set_ao(pin, value)

Set an analog output to a specific value.

# Set analog output to 50% (e.g., half-open valve)
self.platform_iface.set_ao(0, 50.0)

schedule_ao(pin, value, in_secs)

Schedule an analog output change to occur after a delay.

# Ramp down to zero after 60 seconds
self.platform_iface.schedule_ao(0, 0.0, in_secs=60)

The scheduling behaviour is identical to schedule_do(): the Platform Service handles the timer, so the change occurs reliably even if your application restarts.

System Information

These methods read system-level telemetry from the Doovit hardware.

fetch_system_voltage()

Read the system input voltage.

voltage = self.platform_iface.fetch_system_voltage()
print(f"System voltage: {voltage}V")

Useful for monitoring battery levels or power supply health on solar-powered or battery-backed installations.

fetch_system_power()

Read the system power consumption.

power = self.platform_iface.fetch_system_power()
print(f"Power draw: {power}W")

fetch_system_temperature()

Read the internal board temperature.

board_temp = self.platform_iface.fetch_system_temperature()
if board_temp > 70.0:
    self.send_notification(
        message=f"Board temperature high: {board_temp}C",
        severity="warning",
        topic="system_health",
    )

This reads the temperature of the Doovit's compute module, not an external sensor. Use it for thermal management and health monitoring.

fetch_location()

Read the GPS location from the Doovit's GNSS receiver. Returns a Location object.

location = self.platform_iface.fetch_location()
print(f"Lat: {location.latitude}, Lon: {location.longitude}")

The Location object contains latitude, longitude, and additional fix metadata. GPS availability depends on the Doovit's antenna and signal conditions.

fetch_immunity_seconds()

Read the remaining immunity time. Immunity prevents the device from shutting down during critical operations.

remaining = self.platform_iface.fetch_immunity_seconds()

set_immunity_seconds(secs)

Set the immunity period. While immunity is active, the Doovit will not honour shutdown requests.

# Prevent shutdown for the next 5 minutes during a firmware update
self.platform_iface.set_immunity_seconds(300)
Warning
Use Immunity Carefully

Setting a long immunity period prevents the device from shutting down gracefully during that time. Only use this for truly critical operations (firmware updates, data commits) where an interrupted shutdown could cause data loss.

Pulse Counters

Pulse counters track the number of transitions (edges) on a digital input pin. They are designed for flow meters, encoders, tachometers, and other pulse-generating sensors.

get_new_pulse_counter(di, edge, callback, rate_window_secs, auto_start)

Create a new pulse counter on a digital input pin.

# Count rising edges on DI pin 2 (e.g., a flow meter)
flow_counter = self.platform_iface.get_new_pulse_counter(
    di=2,
    edge="rising",
    callback=self.on_flow_pulse,
    rate_window_secs=60,
    auto_start=True,
)
ParameterTypeDescription
diintDigital input pin number to monitor.
edgestrEdge type to count: "rising", "falling", or "both".
callbackcallable or NoneOptional function called on each pulse event.
rate_window_secsintTime window in seconds for rate calculation.
auto_startboolWhether to start counting immediately.

Returns a PulseCounter object for querying count and rate data.

PulseCounter Object

The PulseCounter returned by get_new_pulse_counter() provides the following:

Attribute/MethodTypeDescription
pinintThe digital input pin being monitored.
edgestrThe edge type being counted.
countintTotal number of pulses counted since the counter was started.
pulse_timestampslistTimestamps of recent pulses within the rate window.
start_listener_pulses()methodStart counting (if auto_start was False).
get_pulses_per_minute()floatCurrent pulse rate in pulses per minute, calculated from the rate window.
get_pulses_in_window()intNumber of pulses in the current rate window.
# In the main loop, read the flow rate
def main_loop(self):
    flow_rate = self.flow_counter.get_pulses_per_minute()
    total_pulses = self.flow_counter.count

    self.set_tag("flow_rate", flow_rate)
    self.set_tag("total_flow_pulses", total_pulses)

    # Convert pulses to litres (sensor-specific calibration)
    litres_per_pulse = 0.5
    total_litres = total_pulses * litres_per_pulse
    self.set_tag("total_litres", total_litres)

This example uses the pulse counter's rate and total count to calculate a flow rate and total volume. The rate_window_secs parameter controls the time window over which the rate is averaged, smoothing out short-term variations.

get_new_event_counter(di, edge, callback, rate_window_secs, auto_start)

Create an event counter. This works similarly to a pulse counter but is designed for counting discrete events rather than continuous pulse trains. The API and parameters are identical to get_new_pulse_counter().

Device Control

These methods control the Doovit device itself.

reboot()

Reboot the Doovit device.

self.platform_iface.reboot()
Warning
Reboot Behaviour

Rebooting will restart all containers, including your application. Ensure any critical state is committed to tags or the cloud before calling reboot.

shutdown()

Shut down the Doovit device immediately.

self.platform_iface.shutdown()

schedule_shutdown(time_secs)

Schedule a device shutdown after a delay.

# Shut down in 5 minutes
self.platform_iface.schedule_shutdown(time_secs=300)

The Platform Service manages the timer. The shutdown proceeds even if your application exits before the timer fires.

sync_rtc()

Synchronise the Doovit's real-time clock with the system time.

self.platform_iface.sync_rtc()

This is typically called after a GPS fix to ensure the RTC is accurate. The RTC maintains time during power loss, so periodic synchronisation improves timestamp accuracy.

Practical Examples

Reading Sensors and Controlling a Relay

This example reads a temperature sensor on analog input 0 and controls a cooling fan relay on digital output 0 based on a configurable threshold.

def setup(self):
    self.loop_target_period = 5  # Check every 5 seconds

def main_loop(self):
    temp = self.platform_iface.fetch_ai(0)

    # Convert raw value to degrees (sensor-specific)
    temp_c = (temp - 4.0) / 16.0 * 100.0  # 4-20mA to 0-100C

    self.set_tag("temperature", temp_c)

    # Control fan based on threshold with hysteresis
    fan_on = self.platform_iface.fetch_do(0)
    if temp_c > self.config.fan_on_temp and not fan_on:
        self.platform_iface.set_do(0, True)
    elif temp_c < self.config.fan_off_temp and fan_on:
        self.platform_iface.set_do(0, False)

Monitoring a Flow Meter

This example uses a pulse counter to measure flow from a sensor on digital input 2 and publishes periodic telemetry.

def setup(self):
    self.flow_counter = self.platform_iface.get_new_pulse_counter(
        di=2,
        edge="rising",
        rate_window_secs=60,
        auto_start=True,
    )
    self.loop_target_period = 10

def main_loop(self):
    rate = self.flow_counter.get_pulses_per_minute()
    total = self.flow_counter.count

    self.set_tag("flow_rate_ppm", rate)
    self.set_tag("total_pulses", total)

    self.create_message("flow_telemetry", {
        "flow_rate": rate,
        "total_pulses": total,
    })

System Health Monitoring

This example reads system telemetry and sends alerts when values are outside normal ranges.

def main_loop(self):
    voltage = self.platform_iface.fetch_system_voltage()
    board_temp = self.platform_iface.fetch_system_temperature()
    location = self.platform_iface.fetch_location()

    self.create_message("system_health", {
        "voltage": voltage,
        "board_temp": board_temp,
        "latitude": location.latitude,
        "longitude": location.longitude,
    })

    if voltage < 11.0:
        self.send_notification(
            message=f"Low system voltage: {voltage}V",
            severity="critical",
            topic="power",
        )

Next Steps