Skip to content

PlatformInterface

The PlatformInterface provides access to hardware I/O on Doover devices, including digital inputs/outputs, analog inputs/outputs, system monitoring, and pulse counting.

Import

from pydoover.docker import PlatformInterface, PulseCounter

Overview

The Platform Interface communicates with the platform interface container via gRPC, providing a consistent API for hardware access across different Doover device types. Key capabilities include:

  • Reading digital and analog inputs
  • Setting digital and analog outputs
  • Scheduling timed output changes
  • Counting pulses on digital inputs
  • Reading system metrics (voltage, power, temperature)
  • Getting device location

Supported Hardware

The Platform Interface supports multiple hardware platforms through the platform_iface bridge:

PlatformTypeDIDOAIAONotes
DoovitEdge computeYesYesYesYesPrimary Doover device
IGT34IndustrialYesYesYesYesMost advanced IGT controller
IGT33IndustrialYesYesYesYesMid-range IGT controller
IGT22IndustrialYesYesYesNoEntry-level IGT controller
RPi4SBCGPIOGPIONoNoRaspberry Pi with HATs

Additionally, Moxa industrial I/O modules can be connected as slave devices via Modbus TCP for distributed I/O.

See Supported Hardware for detailed platform specifications.

Class Definition

class PlatformInterface:
    def __init__(
        self,
        app_key: str,
        plt_uri: str = "localhost:50053",
        is_async: bool = False
    )

Constructor Parameters

ParameterTypeDefaultDescription
app_keystrRequiredApplication identifier
plt_uristr"localhost:50053"URI for platform interface gRPC service
is_asyncboolFalseEnable async mode

Digital Inputs

get_di()

Read digital input values. Works in both sync and async contexts.

def get_di(self, *di: int) -> bool | list[bool]

Parameters:

  • *di: One or more pin numbers to read

Returns:

  • Single bool if one pin requested
  • list[bool] if multiple pins requested
  • None if request failed

Example:

# Read single input
button_state = self.platform_iface.get_di(0)
if button_state:
    print("Button pressed")

# Read multiple inputs
di0, di1, di2 = self.platform_iface.get_di(0, 1, 2)
print(f"DI0: {di0}, DI1: {di1}, DI2: {di2}")

get_di_async()

Async version of get_di.

button_state = await self.platform_iface.get_di_async(0)

get_di_events()

Get digital input edge events (for offline event detection).

def get_di_events(
    self,
    di_pin: int,
    edge: str,
    include_system_events: bool = False,
    events_from: int = 0
) -> tuple[bool, list[Event]]

Parameters:

  • di_pin: Pin number to check
  • edge: "rising", "falling", or "both"
  • include_system_events: Include system events like power cycles
  • events_from: Starting timestamp in milliseconds

Returns: Tuple of (events_synced, list of Event objects)

Example:

synced, events = self.platform_iface.get_di_events(0, "rising")
for event in events:
    print(f"Event at {event.time}: {event.event}")

Digital Outputs

get_do()

Read digital output values.

def get_do(self, *do: int) -> bool | list[bool]

Example:

relay_state = self.platform_iface.get_do(0)

set_do()

Set digital output values. Works in both sync and async contexts.

def set_do(
    self,
    do: int | list[int],
    value: bool | list[bool]
) -> list[bool] | None

Parameters:

  • do: Pin number(s) to set
  • value: Value(s) to set. If single value with multiple pins, applies to all

Returns: List of set values, or None on failure

Example:

# Set single output
self.platform_iface.set_do(0, True)

# Set multiple outputs
self.platform_iface.set_do([0, 1], [True, False])

# Set multiple outputs to same value
self.platform_iface.set_do([0, 1, 2], True)  # All high

set_do_async()

Async version of set_do.

await self.platform_iface.set_do_async(0, True)

schedule_do()

Schedule a digital output change for a future time.

def schedule_do(
    self,
    do: int | list[int],
    value: bool | list[bool],
    in_secs: int
) -> None

Parameters:

  • do: Pin number(s) to set
  • value: Value(s) to set
  • in_secs: Seconds from now to apply the change

Example:

# Turn on relay now, schedule turn off in 60 seconds
self.platform_iface.set_do(0, True)
self.platform_iface.schedule_do(0, False, 60)

# Schedule multiple outputs
self.platform_iface.schedule_do([0, 1], [False, True], 30)

Analog Inputs

get_ai()

Read analog input values. Works in both sync and async contexts.

def get_ai(self, *ai: int) -> float | list[float]

Parameters:

  • *ai: One or more pin numbers to read

Returns:

  • Single float if one pin requested
  • list[float] if multiple pins requested
  • None if request failed

Example:

# Read single analog input
voltage = self.platform_iface.get_ai(0)
print(f"Voltage: {voltage}V")

# Read multiple analog inputs
ai0, ai1 = self.platform_iface.get_ai(0, 1)

get_ai_async()

Async version of get_ai.

voltage = await self.platform_iface.get_ai_async(0)

Analog Outputs

get_ao()

Read analog output values.

def get_ao(self, *ao: int) -> float | list[float]

Example:

current_setpoint = self.platform_iface.get_ao(0)

set_ao()

Set analog output values. Works in both sync and async contexts.

def set_ao(
    self,
    ao: int | list[int],
    value: float | list[float]
) -> list[bool] | None

Example:

# Set single analog output
self.platform_iface.set_ao(0, 3.3)

# Set multiple analog outputs
self.platform_iface.set_ao([0, 1], [2.5, 4.0])

set_ao_async()

Async version of set_ao.

await self.platform_iface.set_ao_async(0, 3.3)

schedule_ao()

Schedule an analog output change for a future time.

def schedule_ao(
    self,
    ao: int | list[int],
    value: float | list[float],
    in_secs: int
) -> None

Example:

# Ramp down voltage in 30 seconds
self.platform_iface.schedule_ao(0, 0.0, 30)

Pulse Counting

get_new_pulse_counter()

Create a pulse counter for counting edges on a digital input.

def get_new_pulse_counter(
    self,
    di: int,
    edge: str = "rising",
    callback: Callable = None,
    rate_window_secs: int = 20,
    auto_start: bool = True
) -> PulseCounter

Parameters:

  • di: Digital input pin number
  • edge: "rising", "falling", or "both"
  • callback: Function called on each pulse with (pin, value, dt_secs, count, edge)
  • rate_window_secs: Window size for rate calculation
  • auto_start: Start counting immediately

Example:

def on_pulse(pin, value, dt_secs, count, edge):
    print(f"Pulse {count} on pin {pin}, {dt_secs}s since last")

# Create counter with callback
counter = self.platform_iface.get_new_pulse_counter(
    di=0,
    edge="rising",
    callback=on_pulse,
    rate_window_secs=60
)

# Later, get pulse rate
pulses_per_minute = counter.get_pulses_per_minute()
print(f"Rate: {pulses_per_minute} pulses/min")

get_new_event_counter()

Create an event counter for offline event detection.

def get_new_event_counter(
    self,
    di: int,
    edge: str = "rising",
    callback: Callable = None,
    rate_window_secs: int = 20,
    auto_collect: bool = True
) -> PulseCounter

Example:

# Count offline events (useful after device restart)
counter = self.platform_iface.get_new_event_counter(0, "rising")
print(f"Total events: {counter.get_counter()}")
print(f"Events per minute: {counter.get_pulses_per_minute()}")

PulseCounter Class

The PulseCounter class provides pulse counting functionality:

Attributes

AttributeTypeDescription
pinintDigital input pin number
edgestrEdge type being monitored
countintTotal pulse count
pulse_timestampslist[float]Timestamps of each pulse
rate_window_secsintWindow for rate calculation

Methods

# Get total pulse count
count = counter.get_counter()

# Set counter value
counter.set_counter(100)

# Get pulses per minute
rate = counter.get_pulses_per_minute()

# Get pulses within the rate window
recent_pulses = counter.get_pulses_in_window()

# Set rate window
counter.set_rate_window(30)  # 30 second window

System Monitoring

get_system_voltage()

Get the system input voltage.

def get_system_voltage(self) -> float

Example:

voltage = self.platform_iface.get_system_voltage()
print(f"System voltage: {voltage}V")

get_system_power()

Get the system input power.

def get_system_power(self) -> float

Example:

power = self.platform_iface.get_system_power()
print(f"System power: {power}W")

get_system_temperature()

Get the system temperature (CPU temperature on Doovit).

def get_system_temperature(self) -> float

Example:

temp = self.platform_iface.get_system_temperature()
print(f"System temperature: {temp}C")

Location

get_location()

Get the device location (requires 4G modem with GPS).

def get_location(self) -> Location

Returns: Location object with latitude, longitude, altitude

Example:

location = self.platform_iface.get_location()
if location:
    print(f"Lat: {location.latitude}")
    print(f"Lon: {location.longitude}")
    print(f"Alt: {location.altitude}")

Shutdown Control

get_immunity_seconds()

Get the current shutdown immunity time.

def get_immunity_seconds(self) -> float

set_immunity_seconds()

Set the shutdown immunity time (time device ignores shutdown requests).

def set_immunity_seconds(self, immunity_secs: int) -> float

Example:

# Set 5 minute immunity
self.platform_iface.set_immunity_seconds(300)

schedule_shutdown()

Schedule a system shutdown.

def schedule_shutdown(self, time_secs: int) -> None

schedule_startup()

Schedule a system startup (wake from shutdown).

def schedule_startup(self, time_secs: int) -> None

I/O Table

get_io_table()

Get the complete I/O table showing all inputs and outputs.

def get_io_table(self) -> dict

Example:

io_table = self.platform_iface.get_io_table()
print(f"Digital inputs: {io_table.get('di')}")
print(f"Digital outputs: {io_table.get('do')}")

Connection Testing

test_comms()

Test connection to the platform interface service.

def test_comms(self, message: str = "Comms Check Message") -> str | None

Example:

response = self.platform_iface.test_comms("Hello")
if response:
    print(f"Platform interface responded: {response}")

Complete Example

from pydoover.docker import Application, run_app
from pydoover.config import Schema

class IOApp(Application):
    def setup(self):
        # Set up pulse counter for flow meter
        self.flow_counter = self.platform_iface.get_new_pulse_counter(
            di=0,
            edge="rising",
            callback=self.on_flow_pulse,
            rate_window_secs=60
        )

        # Initialize outputs
        self.platform_iface.set_do(0, False)  # Relay off
        self.platform_iface.set_ao(0, 0.0)    # Valve closed

    async def on_flow_pulse(self, pin, value, dt_secs, count, edge):
        print(f"Flow pulse {count}, rate: {self.flow_counter.get_pulses_per_minute()}/min")

    def main_loop(self):
        # Read sensors
        di0 = self.get_di(0)
        ai0 = self.get_ai(0)

        # Read system status
        voltage = self.platform_iface.get_system_voltage()
        temp = self.platform_iface.get_system_temperature()

        # Control logic
        if ai0 > 3.0:  # Threshold exceeded
            self.set_do(0, True)   # Turn on relay
            self.set_ao(0, 5.0)    # Open valve
        else:
            self.set_do(0, False)
            self.set_ao(0, 0.0)

        # Log status
        print(f"DI0: {di0}, AI0: {ai0}V")
        print(f"System: {voltage}V, {temp}C")
        print(f"Flow: {self.flow_counter.get_counter()} pulses")

if __name__ == "__main__":
    run_app(IOApp(config=Schema()))