Kalman Filter
The KalmanFilter1D class implements a one-dimensional Kalman filter for smoothing noisy sensor data. It is designed for single-value measurements such as temperature, voltage, or pressure readings. The filter automatically calculates time steps between updates, detects outliers, and adapts its sensitivity based on configurable parameters.
Constructor
from pydoover.utils.kalman import KalmanFilter1D
kf = KalmanFilter1D(
initial_estimate=None, # Starting estimate (auto-set from first measurement)
initial_error_estimate=None, # Starting uncertainty (auto-calculated)
process_variance=0.5, # How fast real values are expected to change
outlier_protection=True, # Enable outlier detection
outlier_threshold=5, # Outlier sensitivity (multiples of error estimate)
outlier_variance_multiplier=25, # How much to distrust outlier measurements
)
| Parameter | Type | Default | Description |
|---|---|---|---|
initial_estimate | float | None | Starting estimate. If None, uses the first measurement. |
initial_error_estimate | float | None | Starting uncertainty. If None, auto-calculated from the first measurement variance. |
process_variance | float | 0.5 | How quickly the real value is expected to change. Higher values make the filter more responsive. |
outlier_protection | bool | True | Whether to detect and dampen outlier measurements. |
outlier_threshold | float | 5 | How many multiples of the error estimate a measurement must deviate to be considered an outlier. |
outlier_variance_multiplier | float | 25 | Multiplier applied to the measurement variance when an outlier is detected, reducing its influence. |
Updating the Filter
Pass each new sensor measurement to update() to get a filtered estimate:
filtered_value = kf.update(
measurement, # The raw sensor reading
measurement_variance=None, # Measurement noise (default 0.5)
dt=None, # Time step in seconds (auto-calculated if omitted)
outlier_protection=None, # Override instance-level outlier protection
process_variance=None, # Override instance-level process variance
)
The method returns the updated estimate after incorporating the new measurement. If measurement is None, it returns the previous estimate unchanged.
Understanding Process Variance
The process_variance parameter controls how much the filter trusts new measurements versus its existing estimate:
- Low values (e.g., 0.1) -- the filter changes slowly, heavily smoothing the data. Good for stable signals where noise is the main concern.
- Default (0.5) -- a balanced starting point for most applications.
- High values (e.g., 1.0+) -- the filter tracks changes quickly but provides less smoothing. Use when the real signal changes rapidly.
The process variance is scaled by the time step (dt) internally, so the filter adapts automatically to irregular update intervals.
Outlier Protection
When outlier protection is enabled, the filter detects measurements that deviate significantly from the current estimate. Instead of rejecting outliers entirely, it increases their measurement variance, reducing their influence on the estimate. This approach handles both genuine outliers (sensor glitches) and legitimate step changes (the filter still moves toward the new value, just more cautiously).
To adjust outlier sensitivity:
- Increase
outlier_thresholdto make the filter less likely to flag measurements as outliers. Useful when the signal has high natural variability. - Decrease
outlier_variance_multiplierto allow flagged outliers to still have some influence on the estimate.
Disabling the Filter
You can disable the filter at runtime so that update() passes measurements through unmodified. This is useful for testing or calibration:
kf.enabled = False result = kf.update(42.0) # Returns 42.0 directly
Setter Methods
All parameters can be updated after construction:
| Method | Description |
|---|---|
set_estimate(value) | Override the current estimate. |
set_error_estimate(value) | Override the current uncertainty. |
set_process_variance(value) | Change how responsive the filter is. |
set_outlier_protection(bool) | Enable or disable outlier detection. |
set_outlier_threshold(value) | Change the outlier detection threshold. |
set_outlier_variance_multiplier(value) | Change how much outliers are dampened. |
Decorator Usage
pydoover provides two decorators that automatically apply a Kalman filter to the return value of a function. This is useful in application classes where a method reads a sensor value each loop iteration.
Synchronous Decorator
from pydoover.utils import apply_kalman_filter
class MyApp:
@apply_kalman_filter(process_variance=0.3, outlier_threshold=4)
def read_temperature(self):
# Read raw sensor value (noisy)
raw = self.platform.fetch_ai(1)
return raw
Async Decorator
from pydoover.utils import apply_async_kalman_filter
class MyApp:
@apply_async_kalman_filter(process_variance=0.3)
async def read_temperature(self):
raw = await self.platform.fetch_ai(1)
return raw
Each decorated method maintains its own KalmanFilter1D instance, stored on the class instance. This means multiple methods on the same object each have independent filters.
Overriding Filter Parameters per Call
When calling a decorated function, you can pass special keyword arguments to override filter parameters for that single call:
# Override measurement variance and time step for this call value = self.read_temperature(kf_measurement_variance=1.0, kf_dt=2.0) # Override outlier protection for this call value = self.read_temperature(kf_outlier_protection=False) # Override process variance for this call value = self.read_temperature(kf_process_variance=1.0)
The kf_ prefixed keyword arguments are consumed by the decorator and not passed to the wrapped function. The kf_process_variance override is persistent -- it updates the filter instance for subsequent calls as well.
Accessing the Filter Instance
The filter instance is available on the wrapper function after the first call:
self.read_temperature()
kf = self.read_temperature._kalman_filter
print(f"Current estimate: {kf.estimate}")
Practical Example
This example filters noisy temperature readings from an analog input and publishes the smoothed value:
from pydoover.utils import apply_async_kalman_filter
class TemperatureMonitor:
@apply_async_kalman_filter(
process_variance=0.3,
outlier_threshold=4,
outlier_variance_multiplier=20,
)
async def get_temperature(self):
"""Read temperature from a 4-20mA sensor on analog input 1."""
raw_ma = await self.platform.fetch_ai(1)
# Convert 4-20mA to 0-100 degrees
from pydoover.utils import map_reading
temp = map_reading(raw_ma, [0, 100])
return temp
async def main_loop(self):
temperature = await self.get_temperature()
if temperature is not None:
await self.update_aggregate({"temperature": round(temperature, 1)})
The first call initialises the filter with the raw reading. Subsequent calls smooth the data, reducing noise while still tracking real temperature changes. If a sensor glitch produces a wildly different reading, outlier protection dampens its effect on the filtered output.
Related Pages
- PID Controller -- use filtered sensor data as input to a PID feedback loop
- Alarms -- monitor filtered values against thresholds
- Utilities Overview -- all available utility modules