Skip to content

Kalman Filter

The KalmanFilter1D class implements a one-dimensional Kalman filter for smoothing noisy sensor data. It is designed for single-value measurements such as temperature, voltage, or pressure readings. The filter automatically calculates time steps between updates, detects outliers, and adapts its sensitivity based on configurable parameters.

Constructor

from pydoover.utils.kalman import KalmanFilter1D

kf = KalmanFilter1D(
    initial_estimate=None,           # Starting estimate (auto-set from first measurement)
    initial_error_estimate=None,     # Starting uncertainty (auto-calculated)
    process_variance=0.5,            # How fast real values are expected to change
    outlier_protection=True,         # Enable outlier detection
    outlier_threshold=5,             # Outlier sensitivity (multiples of error estimate)
    outlier_variance_multiplier=25,  # How much to distrust outlier measurements
)
ParameterTypeDefaultDescription
initial_estimatefloatNoneStarting estimate. If None, uses the first measurement.
initial_error_estimatefloatNoneStarting uncertainty. If None, auto-calculated from the first measurement variance.
process_variancefloat0.5How quickly the real value is expected to change. Higher values make the filter more responsive.
outlier_protectionboolTrueWhether to detect and dampen outlier measurements.
outlier_thresholdfloat5How many multiples of the error estimate a measurement must deviate to be considered an outlier.
outlier_variance_multiplierfloat25Multiplier applied to the measurement variance when an outlier is detected, reducing its influence.

Updating the Filter

Pass each new sensor measurement to update() to get a filtered estimate:

filtered_value = kf.update(
    measurement,                     # The raw sensor reading
    measurement_variance=None,       # Measurement noise (default 0.5)
    dt=None,                         # Time step in seconds (auto-calculated if omitted)
    outlier_protection=None,         # Override instance-level outlier protection
    process_variance=None,           # Override instance-level process variance
)

The method returns the updated estimate after incorporating the new measurement. If measurement is None, it returns the previous estimate unchanged.

Understanding Process Variance

The process_variance parameter controls how much the filter trusts new measurements versus its existing estimate:

  • Low values (e.g., 0.1) -- the filter changes slowly, heavily smoothing the data. Good for stable signals where noise is the main concern.
  • Default (0.5) -- a balanced starting point for most applications.
  • High values (e.g., 1.0+) -- the filter tracks changes quickly but provides less smoothing. Use when the real signal changes rapidly.

The process variance is scaled by the time step (dt) internally, so the filter adapts automatically to irregular update intervals.

Outlier Protection

When outlier protection is enabled, the filter detects measurements that deviate significantly from the current estimate. Instead of rejecting outliers entirely, it increases their measurement variance, reducing their influence on the estimate. This approach handles both genuine outliers (sensor glitches) and legitimate step changes (the filter still moves toward the new value, just more cautiously).

To adjust outlier sensitivity:

  • Increase outlier_threshold to make the filter less likely to flag measurements as outliers. Useful when the signal has high natural variability.
  • Decrease outlier_variance_multiplier to allow flagged outliers to still have some influence on the estimate.

Disabling the Filter

You can disable the filter at runtime so that update() passes measurements through unmodified. This is useful for testing or calibration:

kf.enabled = False
result = kf.update(42.0)  # Returns 42.0 directly

Setter Methods

All parameters can be updated after construction:

MethodDescription
set_estimate(value)Override the current estimate.
set_error_estimate(value)Override the current uncertainty.
set_process_variance(value)Change how responsive the filter is.
set_outlier_protection(bool)Enable or disable outlier detection.
set_outlier_threshold(value)Change the outlier detection threshold.
set_outlier_variance_multiplier(value)Change how much outliers are dampened.

Decorator Usage

pydoover provides two decorators that automatically apply a Kalman filter to the return value of a function. This is useful in application classes where a method reads a sensor value each loop iteration.

Synchronous Decorator

from pydoover.utils import apply_kalman_filter

class MyApp:
    @apply_kalman_filter(process_variance=0.3, outlier_threshold=4)
    def read_temperature(self):
        # Read raw sensor value (noisy)
        raw = self.platform.fetch_ai(1)
        return raw

Async Decorator

from pydoover.utils import apply_async_kalman_filter

class MyApp:
    @apply_async_kalman_filter(process_variance=0.3)
    async def read_temperature(self):
        raw = await self.platform.fetch_ai(1)
        return raw

Each decorated method maintains its own KalmanFilter1D instance, stored on the class instance. This means multiple methods on the same object each have independent filters.

Overriding Filter Parameters per Call

When calling a decorated function, you can pass special keyword arguments to override filter parameters for that single call:

# Override measurement variance and time step for this call
value = self.read_temperature(kf_measurement_variance=1.0, kf_dt=2.0)

# Override outlier protection for this call
value = self.read_temperature(kf_outlier_protection=False)

# Override process variance for this call
value = self.read_temperature(kf_process_variance=1.0)
Information Circle

The kf_ prefixed keyword arguments are consumed by the decorator and not passed to the wrapped function. The kf_process_variance override is persistent -- it updates the filter instance for subsequent calls as well.

Accessing the Filter Instance

The filter instance is available on the wrapper function after the first call:

self.read_temperature()
kf = self.read_temperature._kalman_filter
print(f"Current estimate: {kf.estimate}")

Practical Example

This example filters noisy temperature readings from an analog input and publishes the smoothed value:

from pydoover.utils import apply_async_kalman_filter

class TemperatureMonitor:
    @apply_async_kalman_filter(
        process_variance=0.3,
        outlier_threshold=4,
        outlier_variance_multiplier=20,
    )
    async def get_temperature(self):
        """Read temperature from a 4-20mA sensor on analog input 1."""
        raw_ma = await self.platform.fetch_ai(1)
        # Convert 4-20mA to 0-100 degrees
        from pydoover.utils import map_reading
        temp = map_reading(raw_ma, [0, 100])
        return temp

    async def main_loop(self):
        temperature = await self.get_temperature()
        if temperature is not None:
            await self.update_aggregate({"temperature": round(temperature, 1)})

The first call initialises the filter with the raw reading. Subsequent calls smooth the data, reducing noise while still tracking real temperature changes. If a sensor glitch produces a wildly different reading, outlier protection dampens its effect on the filtered output.

Related Pages