PlatformInterface
The PlatformInterface provides access to hardware I/O on Doover devices, including digital inputs/outputs, analog inputs/outputs, system monitoring, and pulse counting.
Import
from pydoover.docker import PlatformInterface, PulseCounter
Overview
The Platform Interface communicates with the platform interface container via gRPC, providing a consistent API for hardware access across different Doover device types. Key capabilities include:
- Reading digital and analog inputs
- Setting digital and analog outputs
- Scheduling timed output changes
- Counting pulses on digital inputs
- Reading system metrics (voltage, power, temperature)
- Getting device location
Supported Hardware
The Platform Interface supports multiple hardware platforms through the platform_iface bridge:
| Platform | Type | DI | DO | AI | AO | Notes |
|---|---|---|---|---|---|---|
| Doovit | Edge compute | Yes | Yes | Yes | Yes | Primary Doover device |
| IGT34 | Industrial | Yes | Yes | Yes | Yes | Most advanced IGT controller |
| IGT33 | Industrial | Yes | Yes | Yes | Yes | Mid-range IGT controller |
| IGT22 | Industrial | Yes | Yes | Yes | No | Entry-level IGT controller |
| RPi4 | SBC | GPIO | GPIO | No | No | Raspberry Pi with HATs |
Additionally, Moxa industrial I/O modules can be connected as slave devices via Modbus TCP for distributed I/O.
See Supported Hardware for detailed platform specifications.
Class Definition
class PlatformInterface:
def __init__(
self,
app_key: str,
plt_uri: str = "localhost:50053",
is_async: bool = False
)
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
app_key | str | Required | Application identifier |
plt_uri | str | "localhost:50053" | URI for platform interface gRPC service |
is_async | bool | False | Enable async mode |
Digital Inputs
get_di()
Read digital input values. Works in both sync and async contexts.
def get_di(self, *di: int) -> bool | list[bool]
Parameters:
*di: One or more pin numbers to read
Returns:
- Single
boolif one pin requested list[bool]if multiple pins requestedNoneif request failed
Example:
# Read single input
button_state = self.platform_iface.get_di(0)
if button_state:
print("Button pressed")
# Read multiple inputs
di0, di1, di2 = self.platform_iface.get_di(0, 1, 2)
print(f"DI0: {di0}, DI1: {di1}, DI2: {di2}")
get_di_async()
Async version of get_di.
button_state = await self.platform_iface.get_di_async(0)
get_di_events()
Get digital input edge events (for offline event detection).
def get_di_events(
self,
di_pin: int,
edge: str,
include_system_events: bool = False,
events_from: int = 0
) -> tuple[bool, list[Event]]
Parameters:
di_pin: Pin number to checkedge:"rising","falling", or"both"include_system_events: Include system events like power cyclesevents_from: Starting timestamp in milliseconds
Returns: Tuple of (events_synced, list of Event objects)
Example:
synced, events = self.platform_iface.get_di_events(0, "rising")
for event in events:
print(f"Event at {event.time}: {event.event}")
Digital Outputs
get_do()
Read digital output values.
def get_do(self, *do: int) -> bool | list[bool]
Example:
relay_state = self.platform_iface.get_do(0)
set_do()
Set digital output values. Works in both sync and async contexts.
def set_do(
self,
do: int | list[int],
value: bool | list[bool]
) -> list[bool] | None
Parameters:
do: Pin number(s) to setvalue: Value(s) to set. If single value with multiple pins, applies to all
Returns: List of set values, or None on failure
Example:
# Set single output self.platform_iface.set_do(0, True) # Set multiple outputs self.platform_iface.set_do([0, 1], [True, False]) # Set multiple outputs to same value self.platform_iface.set_do([0, 1, 2], True) # All high
set_do_async()
Async version of set_do.
await self.platform_iface.set_do_async(0, True)
schedule_do()
Schedule a digital output change for a future time.
def schedule_do(
self,
do: int | list[int],
value: bool | list[bool],
in_secs: int
) -> None
Parameters:
do: Pin number(s) to setvalue: Value(s) to setin_secs: Seconds from now to apply the change
Example:
# Turn on relay now, schedule turn off in 60 seconds self.platform_iface.set_do(0, True) self.platform_iface.schedule_do(0, False, 60) # Schedule multiple outputs self.platform_iface.schedule_do([0, 1], [False, True], 30)
Analog Inputs
get_ai()
Read analog input values. Works in both sync and async contexts.
def get_ai(self, *ai: int) -> float | list[float]
Parameters:
*ai: One or more pin numbers to read
Returns:
- Single
floatif one pin requested list[float]if multiple pins requestedNoneif request failed
Example:
# Read single analog input
voltage = self.platform_iface.get_ai(0)
print(f"Voltage: {voltage}V")
# Read multiple analog inputs
ai0, ai1 = self.platform_iface.get_ai(0, 1)
get_ai_async()
Async version of get_ai.
voltage = await self.platform_iface.get_ai_async(0)
Analog Outputs
get_ao()
Read analog output values.
def get_ao(self, *ao: int) -> float | list[float]
Example:
current_setpoint = self.platform_iface.get_ao(0)
set_ao()
Set analog output values. Works in both sync and async contexts.
def set_ao(
self,
ao: int | list[int],
value: float | list[float]
) -> list[bool] | None
Example:
# Set single analog output self.platform_iface.set_ao(0, 3.3) # Set multiple analog outputs self.platform_iface.set_ao([0, 1], [2.5, 4.0])
set_ao_async()
Async version of set_ao.
await self.platform_iface.set_ao_async(0, 3.3)
schedule_ao()
Schedule an analog output change for a future time.
def schedule_ao(
self,
ao: int | list[int],
value: float | list[float],
in_secs: int
) -> None
Example:
# Ramp down voltage in 30 seconds self.platform_iface.schedule_ao(0, 0.0, 30)
Pulse Counting
get_new_pulse_counter()
Create a pulse counter for counting edges on a digital input.
def get_new_pulse_counter(
self,
di: int,
edge: str = "rising",
callback: Callable = None,
rate_window_secs: int = 20,
auto_start: bool = True
) -> PulseCounter
Parameters:
di: Digital input pin numberedge:"rising","falling", or"both"callback: Function called on each pulse with (pin, value, dt_secs, count, edge)rate_window_secs: Window size for rate calculationauto_start: Start counting immediately
Example:
def on_pulse(pin, value, dt_secs, count, edge):
print(f"Pulse {count} on pin {pin}, {dt_secs}s since last")
# Create counter with callback
counter = self.platform_iface.get_new_pulse_counter(
di=0,
edge="rising",
callback=on_pulse,
rate_window_secs=60
)
# Later, get pulse rate
pulses_per_minute = counter.get_pulses_per_minute()
print(f"Rate: {pulses_per_minute} pulses/min")
get_new_event_counter()
Create an event counter for offline event detection.
def get_new_event_counter(
self,
di: int,
edge: str = "rising",
callback: Callable = None,
rate_window_secs: int = 20,
auto_collect: bool = True
) -> PulseCounter
Example:
# Count offline events (useful after device restart)
counter = self.platform_iface.get_new_event_counter(0, "rising")
print(f"Total events: {counter.get_counter()}")
print(f"Events per minute: {counter.get_pulses_per_minute()}")
PulseCounter Class
The PulseCounter class provides pulse counting functionality:
Attributes
| Attribute | Type | Description |
|---|---|---|
pin | int | Digital input pin number |
edge | str | Edge type being monitored |
count | int | Total pulse count |
pulse_timestamps | list[float] | Timestamps of each pulse |
rate_window_secs | int | Window for rate calculation |
Methods
# Get total pulse count count = counter.get_counter() # Set counter value counter.set_counter(100) # Get pulses per minute rate = counter.get_pulses_per_minute() # Get pulses within the rate window recent_pulses = counter.get_pulses_in_window() # Set rate window counter.set_rate_window(30) # 30 second window
System Monitoring
get_system_voltage()
Get the system input voltage.
def get_system_voltage(self) -> float
Example:
voltage = self.platform_iface.get_system_voltage()
print(f"System voltage: {voltage}V")
get_system_power()
Get the system input power.
def get_system_power(self) -> float
Example:
power = self.platform_iface.get_system_power()
print(f"System power: {power}W")
get_system_temperature()
Get the system temperature (CPU temperature on Doovit).
def get_system_temperature(self) -> float
Example:
temp = self.platform_iface.get_system_temperature()
print(f"System temperature: {temp}C")
Location
get_location()
Get the device location (requires 4G modem with GPS).
def get_location(self) -> Location
Returns: Location object with latitude, longitude, altitude
Example:
location = self.platform_iface.get_location()
if location:
print(f"Lat: {location.latitude}")
print(f"Lon: {location.longitude}")
print(f"Alt: {location.altitude}")
Shutdown Control
get_immunity_seconds()
Get the current shutdown immunity time.
def get_immunity_seconds(self) -> float
set_immunity_seconds()
Set the shutdown immunity time (time device ignores shutdown requests).
def set_immunity_seconds(self, immunity_secs: int) -> float
Example:
# Set 5 minute immunity self.platform_iface.set_immunity_seconds(300)
schedule_shutdown()
Schedule a system shutdown.
def schedule_shutdown(self, time_secs: int) -> None
schedule_startup()
Schedule a system startup (wake from shutdown).
def schedule_startup(self, time_secs: int) -> None
I/O Table
get_io_table()
Get the complete I/O table showing all inputs and outputs.
def get_io_table(self) -> dict
Example:
io_table = self.platform_iface.get_io_table()
print(f"Digital inputs: {io_table.get('di')}")
print(f"Digital outputs: {io_table.get('do')}")
Connection Testing
test_comms()
Test connection to the platform interface service.
def test_comms(self, message: str = "Comms Check Message") -> str | None
Example:
response = self.platform_iface.test_comms("Hello")
if response:
print(f"Platform interface responded: {response}")
Complete Example
from pydoover.docker import Application, run_app
from pydoover.config import Schema
class IOApp(Application):
def setup(self):
# Set up pulse counter for flow meter
self.flow_counter = self.platform_iface.get_new_pulse_counter(
di=0,
edge="rising",
callback=self.on_flow_pulse,
rate_window_secs=60
)
# Initialize outputs
self.platform_iface.set_do(0, False) # Relay off
self.platform_iface.set_ao(0, 0.0) # Valve closed
async def on_flow_pulse(self, pin, value, dt_secs, count, edge):
print(f"Flow pulse {count}, rate: {self.flow_counter.get_pulses_per_minute()}/min")
def main_loop(self):
# Read sensors
di0 = self.get_di(0)
ai0 = self.get_ai(0)
# Read system status
voltage = self.platform_iface.get_system_voltage()
temp = self.platform_iface.get_system_temperature()
# Control logic
if ai0 > 3.0: # Threshold exceeded
self.set_do(0, True) # Turn on relay
self.set_ao(0, 5.0) # Open valve
else:
self.set_do(0, False)
self.set_ao(0, 0.0)
# Log status
print(f"DI0: {di0}, AI0: {ai0}V")
print(f"System: {voltage}V, {temp}C")
print(f"Flow: {self.flow_counter.get_counter()} pulses")
if __name__ == "__main__":
run_app(IOApp(config=Schema()))