ModbusInterface
The ModbusInterface provides Modbus communication capabilities, supporting both RTU (serial) and TCP connections. It allows reading and writing registers, as well as setting up polling subscriptions.
Import
from pydoover.docker import ModbusInterface, ModbusConfig, ManyModbusConfig
Overview
The Modbus Interface communicates with the modbus_iface bridge container via gRPC (port 50054). This bridge handles all Modbus protocol details, allowing your application to focus on business logic.
The modbus_iface bridge provides:
- Serial RTU and TCP Modbus connections
- Reading and writing holding registers, input registers, coils, and discrete inputs
- Automatic bus configuration from application config
- Polling subscriptions with callbacks
- Automatic bus reconnection
Class Definition
class ModbusInterface:
def __init__(
self,
app_key: str,
modbus_uri: str = "127.0.0.1:50054",
is_async: bool = None,
config: Schema = None
)
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
app_key | str | Required | Application identifier |
modbus_uri | str | "127.0.0.1:50054" | URI for modbus interface gRPC service |
is_async | bool | None | Enable async mode |
config | Schema | None | Configuration schema with modbus settings |
Configuration
ModbusConfig
Define a single Modbus bus in your application configuration:
from pydoover.config import Schema
from pydoover.docker import ModbusConfig
class MyConfig(Schema):
def __init__(self):
# Serial RTU configuration
self.modbus_config = ModbusConfig("Modbus Settings")
self.modbus_config.type.default = "serial"
self.modbus_config.name.default = "main_bus"
self.modbus_config.serial_port.default = "/dev/ttyAMA0"
self.modbus_config.serial_baud.default = 9600
ManyModbusConfig
Define multiple Modbus buses:
from pydoover.docker import ManyModbusConfig
class MyConfig(Schema):
def __init__(self):
self.modbus_config = ManyModbusConfig("Modbus Buses")
ModbusConfig Properties
| Property | Type | Default | Description |
|---|---|---|---|
type | Enum | "serial" | Bus type: "serial" or "tcp" |
name | String | "default" | Bus identifier |
serial_port | String | "/dev/ttyAMA0" | Serial port path |
serial_baud | Integer | 9600 | Baud rate |
serial_method | Enum | "rtu" | Method: "rtu", "ascii", "socket", "tls" |
serial_bits | Integer | 8 | Data bits |
serial_parity | Enum | "None" | Parity: "None", "Even", "Odd" |
serial_stop | Integer | 1 | Stop bits |
serial_timeout | Number | 0.3 | Timeout in seconds |
tcp_uri | String | "127.0.0.1:5000" | TCP address for TCP mode |
tcp_timeout | Number | 2.0 | TCP timeout in seconds |
Register Types
Modbus defines four register types, identified by number:
| Type | Number | Description | Read/Write |
|---|---|---|---|
| Coils | 1 | Single bit outputs | Read/Write |
| Discrete Inputs | 2 | Single bit inputs | Read only |
| Input Registers | 3 | 16-bit input values | Read only |
| Holding Registers | 4 | 16-bit output values | Read/Write |
Bus Management
open_bus()
Manually open a Modbus bus. Usually not needed as buses are opened automatically from config.
def open_bus(
self,
bus_type: str = "serial",
name: str = "default",
serial_port: str = "/dev/ttyS0",
serial_baud: int = 9600,
serial_method: str = "rtu",
serial_bits: int = 8,
serial_parity: str = "N",
serial_stop: int = 1,
serial_timeout: float = 0.3,
tcp_uri: str = "127.0.0.1:5000",
tcp_timeout: int = 2
) -> bool
Example:
# Open serial bus
success = self.modbus_iface.open_bus(
bus_type="serial",
name="sensor_bus",
serial_port="/dev/ttyUSB0",
serial_baud=19200
)
# Open TCP bus
success = self.modbus_iface.open_bus(
bus_type="tcp",
name="plc_bus",
tcp_uri="192.168.1.100:502"
)
close_bus()
Close a Modbus bus.
def close_bus(self, bus_id: str = "default") -> bool
get_bus_status()
Check if a bus is open and available.
def get_bus_status(self, bus_id: str = "default") -> bool
Example:
if self.modbus_iface.get_bus_status("main_bus"):
print("Bus is open")
else:
print("Bus is closed or unavailable")
Reading Registers
read_registers()
Read registers from a Modbus device. Works in both sync and async contexts.
def read_registers(
self,
bus_id: str = "default",
modbus_id: int = 1,
start_address: int = 0,
num_registers: int = 1,
register_type: int = 4,
configure_bus: bool = True
) -> int | list[int] | None
Parameters:
bus_id: Bus identifiermodbus_id: Modbus slave ID (1-247)start_address: Starting register addressnum_registers: Number of registers to readregister_type: Register type (1=coils, 2=discrete inputs, 3=input registers, 4=holding registers)configure_bus: Attempt to reconfigure bus if not available
Returns:
- Single
intif one register requested list[int]if multiple registers requestedNoneif request failed
Example:
# Read single holding register
value = self.modbus_iface.read_registers(
bus_id="main_bus",
modbus_id=1,
start_address=100,
num_registers=1,
register_type=4
)
print(f"Register 100: {value}")
# Read multiple registers
values = self.modbus_iface.read_registers(
bus_id="main_bus",
modbus_id=1,
start_address=0,
num_registers=10,
register_type=3 # Input registers
)
print(f"Registers 0-9: {values}")
read_registers_async()
Async version of read_registers.
values = await self.modbus_iface.read_registers_async(
bus_id="main_bus",
modbus_id=1,
start_address=0,
num_registers=10
)
Writing Registers
write_registers()
Write values to Modbus registers. Works in both sync and async contexts.
def write_registers(
self,
bus_id: str = "default",
modbus_id: int = 1,
start_address: int = 0,
values: list[int] = None,
register_type: int = 4,
configure_bus: bool = True
) -> bool
Parameters:
bus_id: Bus identifiermodbus_id: Modbus slave ID (1-247)start_address: Starting register addressvalues: List of values to writeregister_type: Register type (1=coils, 4=holding registers)configure_bus: Attempt to reconfigure bus if not available
Returns: True if successful, False otherwise
Example:
# Write single register
success = self.modbus_iface.write_registers(
bus_id="main_bus",
modbus_id=1,
start_address=100,
values=[1234],
register_type=4
)
# Write multiple registers
success = self.modbus_iface.write_registers(
bus_id="main_bus",
modbus_id=1,
start_address=0,
values=[100, 200, 300, 400],
register_type=4
)
# Write coils
success = self.modbus_iface.write_registers(
bus_id="main_bus",
modbus_id=1,
start_address=0,
values=[1, 0, 1, 1], # ON, OFF, ON, ON
register_type=1
)
write_registers_async()
Async version of write_registers.
success = await self.modbus_iface.write_registers_async(
bus_id="main_bus",
modbus_id=1,
start_address=0,
values=[100, 200]
)
Polling Subscriptions
add_read_register_subscription()
Create a subscription that periodically reads registers and invokes a callback.
def add_read_register_subscription(
self,
bus_id: str = "default",
modbus_id: int = 1,
start_address: int = 0,
num_registers: int = 1,
register_type: int = 4,
poll_secs: int = 3,
callback: Callable[[list[int]], None] = None
)
Parameters:
bus_id: Bus identifiermodbus_id: Modbus slave IDstart_address: Starting register addressnum_registers: Number of registers to readregister_type: Register typepoll_secs: Polling interval in secondscallback: Function called with register values on each poll
Example:
def on_sensor_data(values):
if values is None:
print("Read failed")
return
temperature = values[0] / 10.0 # Assuming 0.1 degree resolution
humidity = values[1] / 10.0
print(f"Temp: {temperature}C, Humidity: {humidity}%")
# Subscribe to sensor readings every 5 seconds
self.modbus_iface.add_read_register_subscription(
bus_id="main_bus",
modbus_id=1,
start_address=0,
num_registers=2,
register_type=3,
poll_secs=5,
callback=on_sensor_data
)
Async callbacks are also supported:
async def on_sensor_data_async(values):
if values is None:
return
await self.process_sensor_data(values)
self.modbus_iface.add_read_register_subscription(
bus_id="main_bus",
modbus_id=1,
start_address=0,
num_registers=10,
poll_secs=1,
callback=on_sensor_data_async
)
Using Through Application
The Application class provides convenience methods for Modbus operations:
class MyApp(Application):
def main_loop(self):
# Read registers
value = self.read_modbus_registers(
address=100,
count=1,
register_type=4,
modbus_id=1,
bus_id="main_bus"
)
# Write registers
self.write_modbus_registers(
address=200,
values=[value * 2],
register_type=4,
modbus_id=1,
bus_id="main_bus"
)
# Add subscription
self.add_new_modbus_read_subscription(
address=0,
count=10,
register_type=3,
callback=self.on_data,
poll_secs=5,
modbus_id=1,
bus_id="main_bus"
)
Connection Testing
test_comms()
Test connection to the modbus interface service.
def test_comms(self, message: str = "Comms Check Message") -> str | None
32-bit Values
Modbus registers are 16-bit. For 32-bit values, use the helper function:
from pydoover.docker.modbus.modbus_iface import two_words_to_32bit_float
# Read two registers that form a 32-bit value
values = self.modbus_iface.read_registers(
bus_id="main_bus",
modbus_id=1,
start_address=100,
num_registers=2
)
# Combine into 32-bit value
float_value = two_words_to_32bit_float(values[0], values[1])
# Some devices swap word order
float_value = two_words_to_32bit_float(values[0], values[1], swap=True)
Complete Example
from pydoover.docker import Application, run_app, ModbusConfig
from pydoover.config import Schema
class ModbusAppConfig(Schema):
def __init__(self):
self.modbus_config = ModbusConfig("Modbus")
self.modbus_config.type.default = "serial"
self.modbus_config.name.default = "sensor_bus"
self.modbus_config.serial_port.default = "/dev/ttyAMA0"
self.modbus_config.serial_baud.default = 9600
class ModbusApp(Application):
config: ModbusAppConfig
def setup(self):
# Subscribe to temperature sensor (slave ID 1)
self.modbus_iface.add_read_register_subscription(
bus_id="sensor_bus",
modbus_id=1,
start_address=0,
num_registers=2,
register_type=3, # Input registers
poll_secs=10,
callback=self.on_temperature
)
# Subscribe to power meter (slave ID 2)
self.modbus_iface.add_read_register_subscription(
bus_id="sensor_bus",
modbus_id=2,
start_address=0,
num_registers=4,
register_type=4, # Holding registers
poll_secs=5,
callback=self.on_power_data
)
def on_temperature(self, values):
if values is None:
print("Failed to read temperature sensor")
return
temp = values[0] / 10.0
humidity = values[1] / 10.0
self.set_tag("temperature", temp)
self.set_tag("humidity", humidity)
print(f"Temperature: {temp}C, Humidity: {humidity}%")
def on_power_data(self, values):
if values is None:
print("Failed to read power meter")
return
voltage = values[0] / 10.0
current = values[1] / 100.0
power = values[2]
energy = values[3]
self.set_tag("voltage", voltage)
self.set_tag("current", current)
self.set_tag("power", power)
print(f"Power: {power}W at {voltage}V")
def main_loop(self):
# Check bus status
if not self.modbus_iface.get_bus_status("sensor_bus"):
print("Warning: Modbus bus not available")
return
# Manual read example
setpoint = self.modbus_iface.read_registers(
bus_id="sensor_bus",
modbus_id=3,
start_address=100,
num_registers=1,
register_type=4
)
if setpoint is not None:
print(f"Current setpoint: {setpoint}")
# Write new setpoint if needed
if self.get_tag("new_setpoint"):
new_value = self.get_tag("new_setpoint")
self.modbus_iface.write_registers(
bus_id="sensor_bus",
modbus_id=3,
start_address=100,
values=[int(new_value)],
register_type=4
)
self.set_tag("new_setpoint", None)
if __name__ == "__main__":
run_app(ModbusApp(config=ModbusAppConfig()))
Error Handling
The interface handles errors gracefully:
- Failed reads return
None - Failed writes return
False - Bus reconnection is attempted automatically when
configure_bus=True - Subscription callbacks receive
Nonewhen reads fail
def safe_read(self):
value = self.modbus_iface.read_registers(
bus_id="main_bus",
modbus_id=1,
start_address=0,
num_registers=1
)
if value is None:
print("Read failed - device may be offline")
return None
return value
Real-World Example: Engine Controller State Parsing
This example from a production pump controller demonstrates reading multiple registers and parsing bit flags to build structured state:
from collections import deque
from enum import Enum
import time
class K37State(Enum):
uncontactable = "uncontactable"
off = "off"
running = "running"
warming_up = "warming_up"
cooling_down = "cooling_down"
k37_errorcode = "error"
class K37StateStore:
"""Parse K37 engine controller Modbus registers into structured state."""
ERROR_CODES = {
0: None,
1: "Low Oil Pressure",
2: "High Coolant Temp",
3: "Overspeed",
4: "Underspeed",
5: "Low Battery",
# ... additional error codes
}
def __init__(self):
self.records = deque(maxlen=100) # State history for validation
self.state = K37State.uncontactable
self.last_contact_time = None
def update(self, values):
"""Parse register values into structured state."""
if values is None or len(values) < 60:
self.state = K37State.uncontactable
return
self.last_contact_time = time.time()
# Parse scaled values
self.load = values[1]
self.rpm = values[2]
self.coolant_temp = values[3]
self.fuel_rate = values[4] / 10.0 # Scale: 0.1 L/hr
self.sys_voltage = values[5] / 10.0 # Scale: 0.1 V
self.batt_voltage = values[6] / 10.0
self.engine_hours = values[7]
self.watchdog_num = values[22]
# Parse bit flags from status word (register 10)
status_word = values[10]
self.is_auto = bool(status_word & 0x01)
self.is_running = bool(status_word & 0x02)
self.is_warming_up = bool(status_word & 0x04)
self.is_cooling_down = bool(status_word & 0x08)
self.has_alarm = bool(status_word & 0x10)
# Parse error code
error_code = values[15]
self.error_string = self.ERROR_CODES.get(error_code, f"Unknown: {error_code}")
# Determine overall state
if error_code > 0:
self.state = K37State.k37_errorcode
elif self.is_warming_up:
self.state = K37State.warming_up
elif self.is_cooling_down:
self.state = K37State.cooling_down
elif self.is_running:
self.state = K37State.running
else:
self.state = K37State.off
# Store for history tracking
self.records.append({
"time": time.time(),
"state": self.state,
"error": error_code,
})
@property
def is_contactable(self):
if self.last_contact_time is None:
return False
return time.time() - self.last_contact_time < 8
@property
def has_lost_comms(self):
return not self.is_contactable
def check_error_confirmed(self, min_count=3):
"""Check if error appears consistently in recent records."""
recent = list(self.records)[-min_count:]
return all(r["state"] == K37State.k37_errorcode for r in recent)
Source: pump_station_controller
Key techniques demonstrated:
- Scaled values: Divide raw registers by 10 or 100 for decimal precision
- Bit flag extraction: Use bitwise AND to extract individual status bits
- Error code mapping: Dictionary lookup for human-readable error messages
- State history: Track recent states to confirm errors before acting