Skip to content

ModbusInterface

The ModbusInterface provides Modbus communication capabilities, supporting both RTU (serial) and TCP connections. It allows reading and writing registers, as well as setting up polling subscriptions.

Import

from pydoover.docker import ModbusInterface, ModbusConfig, ManyModbusConfig

Overview

The Modbus Interface communicates with the modbus_iface bridge container via gRPC (port 50054). This bridge handles all Modbus protocol details, allowing your application to focus on business logic.

The modbus_iface bridge provides:

  • Serial RTU and TCP Modbus connections
  • Reading and writing holding registers, input registers, coils, and discrete inputs
  • Automatic bus configuration from application config
  • Polling subscriptions with callbacks
  • Automatic bus reconnection

Class Definition

class ModbusInterface:
    def __init__(
        self,
        app_key: str,
        modbus_uri: str = "127.0.0.1:50054",
        is_async: bool = None,
        config: Schema = None
    )

Constructor Parameters

ParameterTypeDefaultDescription
app_keystrRequiredApplication identifier
modbus_uristr"127.0.0.1:50054"URI for modbus interface gRPC service
is_asyncboolNoneEnable async mode
configSchemaNoneConfiguration schema with modbus settings

Configuration

ModbusConfig

Define a single Modbus bus in your application configuration:

from pydoover.config import Schema
from pydoover.docker import ModbusConfig

class MyConfig(Schema):
    def __init__(self):
        # Serial RTU configuration
        self.modbus_config = ModbusConfig("Modbus Settings")
        self.modbus_config.type.default = "serial"
        self.modbus_config.name.default = "main_bus"
        self.modbus_config.serial_port.default = "/dev/ttyAMA0"
        self.modbus_config.serial_baud.default = 9600

ManyModbusConfig

Define multiple Modbus buses:

from pydoover.docker import ManyModbusConfig

class MyConfig(Schema):
    def __init__(self):
        self.modbus_config = ManyModbusConfig("Modbus Buses")

ModbusConfig Properties

PropertyTypeDefaultDescription
typeEnum"serial"Bus type: "serial" or "tcp"
nameString"default"Bus identifier
serial_portString"/dev/ttyAMA0"Serial port path
serial_baudInteger9600Baud rate
serial_methodEnum"rtu"Method: "rtu", "ascii", "socket", "tls"
serial_bitsInteger8Data bits
serial_parityEnum"None"Parity: "None", "Even", "Odd"
serial_stopInteger1Stop bits
serial_timeoutNumber0.3Timeout in seconds
tcp_uriString"127.0.0.1:5000"TCP address for TCP mode
tcp_timeoutNumber2.0TCP timeout in seconds

Register Types

Modbus defines four register types, identified by number:

TypeNumberDescriptionRead/Write
Coils1Single bit outputsRead/Write
Discrete Inputs2Single bit inputsRead only
Input Registers316-bit input valuesRead only
Holding Registers416-bit output valuesRead/Write

Bus Management

open_bus()

Manually open a Modbus bus. Usually not needed as buses are opened automatically from config.

def open_bus(
    self,
    bus_type: str = "serial",
    name: str = "default",
    serial_port: str = "/dev/ttyS0",
    serial_baud: int = 9600,
    serial_method: str = "rtu",
    serial_bits: int = 8,
    serial_parity: str = "N",
    serial_stop: int = 1,
    serial_timeout: float = 0.3,
    tcp_uri: str = "127.0.0.1:5000",
    tcp_timeout: int = 2
) -> bool

Example:

# Open serial bus
success = self.modbus_iface.open_bus(
    bus_type="serial",
    name="sensor_bus",
    serial_port="/dev/ttyUSB0",
    serial_baud=19200
)

# Open TCP bus
success = self.modbus_iface.open_bus(
    bus_type="tcp",
    name="plc_bus",
    tcp_uri="192.168.1.100:502"
)

close_bus()

Close a Modbus bus.

def close_bus(self, bus_id: str = "default") -> bool

get_bus_status()

Check if a bus is open and available.

def get_bus_status(self, bus_id: str = "default") -> bool

Example:

if self.modbus_iface.get_bus_status("main_bus"):
    print("Bus is open")
else:
    print("Bus is closed or unavailable")

Reading Registers

read_registers()

Read registers from a Modbus device. Works in both sync and async contexts.

def read_registers(
    self,
    bus_id: str = "default",
    modbus_id: int = 1,
    start_address: int = 0,
    num_registers: int = 1,
    register_type: int = 4,
    configure_bus: bool = True
) -> int | list[int] | None

Parameters:

  • bus_id: Bus identifier
  • modbus_id: Modbus slave ID (1-247)
  • start_address: Starting register address
  • num_registers: Number of registers to read
  • register_type: Register type (1=coils, 2=discrete inputs, 3=input registers, 4=holding registers)
  • configure_bus: Attempt to reconfigure bus if not available

Returns:

  • Single int if one register requested
  • list[int] if multiple registers requested
  • None if request failed

Example:

# Read single holding register
value = self.modbus_iface.read_registers(
    bus_id="main_bus",
    modbus_id=1,
    start_address=100,
    num_registers=1,
    register_type=4
)
print(f"Register 100: {value}")

# Read multiple registers
values = self.modbus_iface.read_registers(
    bus_id="main_bus",
    modbus_id=1,
    start_address=0,
    num_registers=10,
    register_type=3  # Input registers
)
print(f"Registers 0-9: {values}")

read_registers_async()

Async version of read_registers.

values = await self.modbus_iface.read_registers_async(
    bus_id="main_bus",
    modbus_id=1,
    start_address=0,
    num_registers=10
)

Writing Registers

write_registers()

Write values to Modbus registers. Works in both sync and async contexts.

def write_registers(
    self,
    bus_id: str = "default",
    modbus_id: int = 1,
    start_address: int = 0,
    values: list[int] = None,
    register_type: int = 4,
    configure_bus: bool = True
) -> bool

Parameters:

  • bus_id: Bus identifier
  • modbus_id: Modbus slave ID (1-247)
  • start_address: Starting register address
  • values: List of values to write
  • register_type: Register type (1=coils, 4=holding registers)
  • configure_bus: Attempt to reconfigure bus if not available

Returns: True if successful, False otherwise

Example:

# Write single register
success = self.modbus_iface.write_registers(
    bus_id="main_bus",
    modbus_id=1,
    start_address=100,
    values=[1234],
    register_type=4
)

# Write multiple registers
success = self.modbus_iface.write_registers(
    bus_id="main_bus",
    modbus_id=1,
    start_address=0,
    values=[100, 200, 300, 400],
    register_type=4
)

# Write coils
success = self.modbus_iface.write_registers(
    bus_id="main_bus",
    modbus_id=1,
    start_address=0,
    values=[1, 0, 1, 1],  # ON, OFF, ON, ON
    register_type=1
)

write_registers_async()

Async version of write_registers.

success = await self.modbus_iface.write_registers_async(
    bus_id="main_bus",
    modbus_id=1,
    start_address=0,
    values=[100, 200]
)

Polling Subscriptions

add_read_register_subscription()

Create a subscription that periodically reads registers and invokes a callback.

def add_read_register_subscription(
    self,
    bus_id: str = "default",
    modbus_id: int = 1,
    start_address: int = 0,
    num_registers: int = 1,
    register_type: int = 4,
    poll_secs: int = 3,
    callback: Callable[[list[int]], None] = None
)

Parameters:

  • bus_id: Bus identifier
  • modbus_id: Modbus slave ID
  • start_address: Starting register address
  • num_registers: Number of registers to read
  • register_type: Register type
  • poll_secs: Polling interval in seconds
  • callback: Function called with register values on each poll

Example:

def on_sensor_data(values):
    if values is None:
        print("Read failed")
        return

    temperature = values[0] / 10.0  # Assuming 0.1 degree resolution
    humidity = values[1] / 10.0
    print(f"Temp: {temperature}C, Humidity: {humidity}%")

# Subscribe to sensor readings every 5 seconds
self.modbus_iface.add_read_register_subscription(
    bus_id="main_bus",
    modbus_id=1,
    start_address=0,
    num_registers=2,
    register_type=3,
    poll_secs=5,
    callback=on_sensor_data
)

Async callbacks are also supported:

async def on_sensor_data_async(values):
    if values is None:
        return
    await self.process_sensor_data(values)

self.modbus_iface.add_read_register_subscription(
    bus_id="main_bus",
    modbus_id=1,
    start_address=0,
    num_registers=10,
    poll_secs=1,
    callback=on_sensor_data_async
)

Using Through Application

The Application class provides convenience methods for Modbus operations:

class MyApp(Application):
    def main_loop(self):
        # Read registers
        value = self.read_modbus_registers(
            address=100,
            count=1,
            register_type=4,
            modbus_id=1,
            bus_id="main_bus"
        )

        # Write registers
        self.write_modbus_registers(
            address=200,
            values=[value * 2],
            register_type=4,
            modbus_id=1,
            bus_id="main_bus"
        )

        # Add subscription
        self.add_new_modbus_read_subscription(
            address=0,
            count=10,
            register_type=3,
            callback=self.on_data,
            poll_secs=5,
            modbus_id=1,
            bus_id="main_bus"
        )

Connection Testing

test_comms()

Test connection to the modbus interface service.

def test_comms(self, message: str = "Comms Check Message") -> str | None

32-bit Values

Modbus registers are 16-bit. For 32-bit values, use the helper function:

from pydoover.docker.modbus.modbus_iface import two_words_to_32bit_float

# Read two registers that form a 32-bit value
values = self.modbus_iface.read_registers(
    bus_id="main_bus",
    modbus_id=1,
    start_address=100,
    num_registers=2
)

# Combine into 32-bit value
float_value = two_words_to_32bit_float(values[0], values[1])

# Some devices swap word order
float_value = two_words_to_32bit_float(values[0], values[1], swap=True)

Complete Example

from pydoover.docker import Application, run_app, ModbusConfig
from pydoover.config import Schema

class ModbusAppConfig(Schema):
    def __init__(self):
        self.modbus_config = ModbusConfig("Modbus")
        self.modbus_config.type.default = "serial"
        self.modbus_config.name.default = "sensor_bus"
        self.modbus_config.serial_port.default = "/dev/ttyAMA0"
        self.modbus_config.serial_baud.default = 9600

class ModbusApp(Application):
    config: ModbusAppConfig

    def setup(self):
        # Subscribe to temperature sensor (slave ID 1)
        self.modbus_iface.add_read_register_subscription(
            bus_id="sensor_bus",
            modbus_id=1,
            start_address=0,
            num_registers=2,
            register_type=3,  # Input registers
            poll_secs=10,
            callback=self.on_temperature
        )

        # Subscribe to power meter (slave ID 2)
        self.modbus_iface.add_read_register_subscription(
            bus_id="sensor_bus",
            modbus_id=2,
            start_address=0,
            num_registers=4,
            register_type=4,  # Holding registers
            poll_secs=5,
            callback=self.on_power_data
        )

    def on_temperature(self, values):
        if values is None:
            print("Failed to read temperature sensor")
            return

        temp = values[0] / 10.0
        humidity = values[1] / 10.0

        self.set_tag("temperature", temp)
        self.set_tag("humidity", humidity)
        print(f"Temperature: {temp}C, Humidity: {humidity}%")

    def on_power_data(self, values):
        if values is None:
            print("Failed to read power meter")
            return

        voltage = values[0] / 10.0
        current = values[1] / 100.0
        power = values[2]
        energy = values[3]

        self.set_tag("voltage", voltage)
        self.set_tag("current", current)
        self.set_tag("power", power)
        print(f"Power: {power}W at {voltage}V")

    def main_loop(self):
        # Check bus status
        if not self.modbus_iface.get_bus_status("sensor_bus"):
            print("Warning: Modbus bus not available")
            return

        # Manual read example
        setpoint = self.modbus_iface.read_registers(
            bus_id="sensor_bus",
            modbus_id=3,
            start_address=100,
            num_registers=1,
            register_type=4
        )

        if setpoint is not None:
            print(f"Current setpoint: {setpoint}")

            # Write new setpoint if needed
            if self.get_tag("new_setpoint"):
                new_value = self.get_tag("new_setpoint")
                self.modbus_iface.write_registers(
                    bus_id="sensor_bus",
                    modbus_id=3,
                    start_address=100,
                    values=[int(new_value)],
                    register_type=4
                )
                self.set_tag("new_setpoint", None)

if __name__ == "__main__":
    run_app(ModbusApp(config=ModbusAppConfig()))

Error Handling

The interface handles errors gracefully:

  • Failed reads return None
  • Failed writes return False
  • Bus reconnection is attempted automatically when configure_bus=True
  • Subscription callbacks receive None when reads fail
def safe_read(self):
    value = self.modbus_iface.read_registers(
        bus_id="main_bus",
        modbus_id=1,
        start_address=0,
        num_registers=1
    )

    if value is None:
        print("Read failed - device may be offline")
        return None

    return value

Real-World Example: Engine Controller State Parsing

This example from a production pump controller demonstrates reading multiple registers and parsing bit flags to build structured state:

from collections import deque
from enum import Enum
import time

class K37State(Enum):
    uncontactable = "uncontactable"
    off = "off"
    running = "running"
    warming_up = "warming_up"
    cooling_down = "cooling_down"
    k37_errorcode = "error"

class K37StateStore:
    """Parse K37 engine controller Modbus registers into structured state."""

    ERROR_CODES = {
        0: None,
        1: "Low Oil Pressure",
        2: "High Coolant Temp",
        3: "Overspeed",
        4: "Underspeed",
        5: "Low Battery",
        # ... additional error codes
    }

    def __init__(self):
        self.records = deque(maxlen=100)  # State history for validation
        self.state = K37State.uncontactable
        self.last_contact_time = None

    def update(self, values):
        """Parse register values into structured state."""
        if values is None or len(values) < 60:
            self.state = K37State.uncontactable
            return

        self.last_contact_time = time.time()

        # Parse scaled values
        self.load = values[1]
        self.rpm = values[2]
        self.coolant_temp = values[3]
        self.fuel_rate = values[4] / 10.0      # Scale: 0.1 L/hr
        self.sys_voltage = values[5] / 10.0    # Scale: 0.1 V
        self.batt_voltage = values[6] / 10.0
        self.engine_hours = values[7]
        self.watchdog_num = values[22]

        # Parse bit flags from status word (register 10)
        status_word = values[10]
        self.is_auto = bool(status_word & 0x01)
        self.is_running = bool(status_word & 0x02)
        self.is_warming_up = bool(status_word & 0x04)
        self.is_cooling_down = bool(status_word & 0x08)
        self.has_alarm = bool(status_word & 0x10)

        # Parse error code
        error_code = values[15]
        self.error_string = self.ERROR_CODES.get(error_code, f"Unknown: {error_code}")

        # Determine overall state
        if error_code > 0:
            self.state = K37State.k37_errorcode
        elif self.is_warming_up:
            self.state = K37State.warming_up
        elif self.is_cooling_down:
            self.state = K37State.cooling_down
        elif self.is_running:
            self.state = K37State.running
        else:
            self.state = K37State.off

        # Store for history tracking
        self.records.append({
            "time": time.time(),
            "state": self.state,
            "error": error_code,
        })

    @property
    def is_contactable(self):
        if self.last_contact_time is None:
            return False
        return time.time() - self.last_contact_time < 8

    @property
    def has_lost_comms(self):
        return not self.is_contactable

    def check_error_confirmed(self, min_count=3):
        """Check if error appears consistently in recent records."""
        recent = list(self.records)[-min_count:]
        return all(r["state"] == K37State.k37_errorcode for r in recent)

Source: pump_station_controller

Key techniques demonstrated:

  • Scaled values: Divide raw registers by 10 or 100 for decimal precision
  • Bit flag extraction: Use bitwise AND to extract individual status bits
  • Error code mapping: Dictionary lookup for human-readable error messages
  • State history: Track recent states to confirm errors before acting