Skip to content

Kalman Filter

The pydoover utilities module provides a 1D Kalman filter implementation designed for smoothing sensor readings. The filter includes automatic time step calculation, outlier detection, and decorator-based application for easy integration with sensor reading methods.

Import

from pydoover.utils import (
    apply_kalman_filter,
    apply_async_kalman_filter,
)

Overview

A Kalman filter is an algorithm that uses a series of measurements observed over time to estimate unknown variables. In the context of sensor data, it helps:

  • Smooth noisy sensor readings
  • Detect and handle outliers
  • Provide more stable measurements for control systems

The pydoover implementation is a simple 1D Kalman filter optimized for single-value sensor readings (temperature, voltage, pressure, etc.).

Decorators

apply_kalman_filter()

A decorator that applies Kalman filtering to the return value of a synchronous method.

Signature

def apply_kalman_filter(
    initial_estimate: float = None,
    initial_error_estimate: float = None,
    process_variance: float = None,
    outlier_protection: bool = None,
    outlier_threshold: float = None,
    outlier_variance_multiplier: float = None,
) -> Callable

Parameters

ParameterTypeDefaultDescription
initial_estimatefloatNoneInitial state estimate; uses first measurement if not provided
initial_error_estimatefloatNoneInitial estimate uncertainty
process_variancefloat0.5Process noise variance (Q)
outlier_protectionboolTrueEnable outlier detection
outlier_thresholdfloat5Outlier threshold (multiples of error estimate)
outlier_variance_multiplierfloat25Variance multiplier for outliers

apply_async_kalman_filter()

Async version of apply_kalman_filter() for use with async methods.

Same parameters as apply_kalman_filter().

Basic Usage

Synchronous Method

from pydoover.utils import apply_kalman_filter

class TemperatureSensor:
    def __init__(self):
        self._raw_sensor = RawSensor()

    @apply_kalman_filter(process_variance=0.5)
    def read_temperature(self):
        """Returns filtered temperature reading"""
        return self._raw_sensor.read()

# Usage
sensor = TemperatureSensor()
for _ in range(100):
    temp = sensor.read_temperature()
    print(f"Filtered temperature: {temp:.2f}")

Asynchronous Method

from pydoover.utils import apply_async_kalman_filter

class AsyncTemperatureSensor:
    def __init__(self):
        self._raw_sensor = AsyncRawSensor()

    @apply_async_kalman_filter(process_variance=0.5)
    async def read_temperature(self):
        """Returns filtered temperature reading"""
        return await self._raw_sensor.read()

# Usage
async def main():
    sensor = AsyncTemperatureSensor()
    for _ in range(100):
        temp = await sensor.read_temperature()
        print(f"Filtered temperature: {temp:.2f}")

Runtime Parameters

When calling a decorated method, you can override filter parameters using special keyword arguments prefixed with kf_:

Keyword ArgumentDescription
kf_measurement_varianceVariance of current measurement
kf_dtOverride automatic time delta
kf_outlier_protectionOverride outlier protection for this call
kf_process_varianceOverride process variance for this call

Example

@apply_kalman_filter()
def read_sensor(self):
    return self._sensor.read()

# Normal call
value = sensor.read_sensor()

# With higher measurement uncertainty
value = sensor.read_sensor(kf_measurement_variance=2.0)

# Disable outlier protection for this reading
value = sensor.read_sensor(kf_outlier_protection=False)

# Specify exact time delta (useful for testing)
value = sensor.read_sensor(kf_dt=0.1)

Accessing the Filter Instance

The Kalman filter instance is accessible via the wrapper for advanced use cases:

@apply_kalman_filter(process_variance=0.5)
def read_temperature(self):
    return self._raw_sensor.read()

# After calling the method at least once
temp = sensor.read_temperature()

# Access the filter
kf = sensor.read_temperature._kalman_filter

# Read filter state
print(f"Current estimate: {kf.estimate}")
print(f"Error estimate: {kf.error_estimate}")
print(f"Kalman gain: {kf.kalman_gain}")

# Modify filter settings
kf.set_process_variance(1.0)
kf.set_outlier_threshold(3)

# Disable filter (pass-through mode)
kf.enabled = False

Parameter Tuning

Process Variance

The process_variance parameter controls how much the filter trusts new measurements versus its previous estimate:

ValueEffect
Lower (0.1)More smoothing, slower response to real changes
Default (0.5)Balanced smoothing and responsiveness
Higher (1.0+)Less smoothing, faster response to changes
# Heavy smoothing for very noisy sensor
@apply_kalman_filter(process_variance=0.1)
def read_noisy_sensor(self):
    return self._sensor.read()

# Light smoothing for cleaner sensor
@apply_kalman_filter(process_variance=1.0)
def read_clean_sensor(self):
    return self._sensor.read()

Outlier Protection

Outlier protection prevents sudden spikes from significantly affecting the estimate:

# Strict outlier detection (more conservative)
@apply_kalman_filter(
    outlier_protection=True,
    outlier_threshold=3,           # Flag values 3x error estimate away
    outlier_variance_multiplier=50  # Heavily discount outliers
)
def read_sensor(self):
    return self._sensor.read()

# Relaxed outlier detection
@apply_kalman_filter(
    outlier_protection=True,
    outlier_threshold=10,          # Only flag extreme outliers
    outlier_variance_multiplier=10  # Moderate discount
)
def read_sensor(self):
    return self._sensor.read()

# Disable outlier protection
@apply_kalman_filter(outlier_protection=False)
def read_sensor(self):
    return self._sensor.read()

Integration with pydoover Applications

Basic Application Example

from pydoover.docker import Application
from pydoover.utils import apply_async_kalman_filter

class SensorApplication(Application):
    def setup(self):
        self.platform = self.get_platform_interface()

    @apply_async_kalman_filter(process_variance=0.5)
    async def read_temperature(self):
        """Read and filter temperature from analog input 0"""
        raw_value = self.platform.get_analog_input(0)
        # Convert raw value to temperature
        return (raw_value / 4095.0) * 100.0  # 0-100 degrees

    @apply_async_kalman_filter(process_variance=0.3)
    async def read_pressure(self):
        """Read and filter pressure from analog input 1"""
        raw_value = self.platform.get_analog_input(1)
        # Convert raw value to pressure
        return (raw_value / 4095.0) * 1000.0  # 0-1000 kPa

    async def main_loop(self):
        while self.running:
            temp = await self.read_temperature()
            pressure = await self.read_pressure()

            self.ui_manager.update_variable("temperature", temp)
            self.ui_manager.update_variable("pressure", pressure)

            await asyncio.sleep(0.1)

Combined with PID Control

from pydoover.docker import Application
from pydoover.utils import apply_async_kalman_filter, PID

class TemperatureController(Application):
    def setup(self):
        self.platform = self.get_platform_interface()
        self.pid = PID(
            Kp=2.0, Ki=0.1, Kd=0.05,
            setpoint=25.0,
            output_limits=(0, 100)
        )

    @apply_async_kalman_filter(process_variance=0.5)
    async def read_temperature(self):
        """Filtered temperature reading"""
        raw = self.platform.get_analog_input(0)
        return self.convert_to_celsius(raw)

    async def main_loop(self):
        while self.running:
            # Get filtered temperature
            temp = await self.read_temperature()

            # PID control with clean input
            power = self.pid.update(temp)
            self.platform.set_analog_output(0, power)

            await asyncio.sleep(0.1)

Multiple Sensors

Each decorated method gets its own independent Kalman filter instance:

class MultiSensor:
    @apply_kalman_filter(process_variance=0.5)
    def read_sensor_1(self):
        return self._read_raw(1)

    @apply_kalman_filter(process_variance=0.3)
    def read_sensor_2(self):
        return self._read_raw(2)

    @apply_kalman_filter(process_variance=1.0)
    def read_sensor_3(self):
        return self._read_raw(3)

# Each sensor has independent filtering
sensor = MultiSensor()
val1 = sensor.read_sensor_1()  # Uses filter with variance 0.5
val2 = sensor.read_sensor_2()  # Uses filter with variance 0.3
val3 = sensor.read_sensor_3()  # Uses filter with variance 1.0

Debugging

Enable debug logging to see filter internals:

import logging

# Enable debug logging for kalman module
logging.getLogger("pydoover.utils.kalman").setLevel(logging.DEBUG)

# Or access filter directly
@apply_kalman_filter()
def read_sensor(self):
    return self._sensor.read()

# Enable debug mode on filter instance
sensor.read_sensor()  # Initialize filter
sensor.read_sensor._kalman_filter.debug = True

# Now filter will log detailed information
sensor.read_sensor()  # Logs: "Measurement: X, Estimate: Y, Error estimate: Z"

Filter State Management

Resetting the Filter

To reset a filter to its initial state:

# Access the filter and reset
sensor.read_sensor._kalman_filter.estimate = None
sensor.read_sensor._kalman_filter.error_estimate = None
sensor.read_sensor._kalman_filter.last_timestamp = None

Seeding with Known Value

If you know the approximate starting value:

@apply_kalman_filter(
    initial_estimate=25.0,          # Expected starting temperature
    initial_error_estimate=1.0      # Confidence in initial estimate
)
def read_temperature(self):
    return self._sensor.read()

Comparison: Raw vs Filtered

import time
from pydoover.utils import apply_kalman_filter

class SensorDemo:
    def __init__(self):
        self.raw_values = []
        self.filtered_values = []

    def read_raw(self):
        """Simulated noisy sensor"""
        import random
        return 25.0 + random.gauss(0, 2)  # 25 degrees with noise

    @apply_kalman_filter(process_variance=0.5)
    def read_filtered(self):
        return self.read_raw()

    def demo(self, n=50):
        for _ in range(n):
            raw = self.read_raw()
            filtered = self.read_filtered()

            self.raw_values.append(raw)
            self.filtered_values.append(filtered)

            print(f"Raw: {raw:.2f}, Filtered: {filtered:.2f}")
            time.sleep(0.1)

# Run demo
demo = SensorDemo()
demo.demo()

Related Pages