Kalman Filter
The pydoover utilities module provides a 1D Kalman filter implementation designed for smoothing sensor readings. The filter includes automatic time step calculation, outlier detection, and decorator-based application for easy integration with sensor reading methods.
Import
from pydoover.utils import (
apply_kalman_filter,
apply_async_kalman_filter,
)
Overview
A Kalman filter is an algorithm that uses a series of measurements observed over time to estimate unknown variables. In the context of sensor data, it helps:
- Smooth noisy sensor readings
- Detect and handle outliers
- Provide more stable measurements for control systems
The pydoover implementation is a simple 1D Kalman filter optimized for single-value sensor readings (temperature, voltage, pressure, etc.).
Decorators
apply_kalman_filter()
A decorator that applies Kalman filtering to the return value of a synchronous method.
Signature
def apply_kalman_filter(
initial_estimate: float = None,
initial_error_estimate: float = None,
process_variance: float = None,
outlier_protection: bool = None,
outlier_threshold: float = None,
outlier_variance_multiplier: float = None,
) -> Callable
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
initial_estimate | float | None | Initial state estimate; uses first measurement if not provided |
initial_error_estimate | float | None | Initial estimate uncertainty |
process_variance | float | 0.5 | Process noise variance (Q) |
outlier_protection | bool | True | Enable outlier detection |
outlier_threshold | float | 5 | Outlier threshold (multiples of error estimate) |
outlier_variance_multiplier | float | 25 | Variance multiplier for outliers |
apply_async_kalman_filter()
Async version of apply_kalman_filter() for use with async methods.
Same parameters as apply_kalman_filter().
Basic Usage
Synchronous Method
from pydoover.utils import apply_kalman_filter
class TemperatureSensor:
def __init__(self):
self._raw_sensor = RawSensor()
@apply_kalman_filter(process_variance=0.5)
def read_temperature(self):
"""Returns filtered temperature reading"""
return self._raw_sensor.read()
# Usage
sensor = TemperatureSensor()
for _ in range(100):
temp = sensor.read_temperature()
print(f"Filtered temperature: {temp:.2f}")
Asynchronous Method
from pydoover.utils import apply_async_kalman_filter
class AsyncTemperatureSensor:
def __init__(self):
self._raw_sensor = AsyncRawSensor()
@apply_async_kalman_filter(process_variance=0.5)
async def read_temperature(self):
"""Returns filtered temperature reading"""
return await self._raw_sensor.read()
# Usage
async def main():
sensor = AsyncTemperatureSensor()
for _ in range(100):
temp = await sensor.read_temperature()
print(f"Filtered temperature: {temp:.2f}")
Runtime Parameters
When calling a decorated method, you can override filter parameters using special keyword arguments prefixed with kf_:
| Keyword Argument | Description |
|---|---|
kf_measurement_variance | Variance of current measurement |
kf_dt | Override automatic time delta |
kf_outlier_protection | Override outlier protection for this call |
kf_process_variance | Override process variance for this call |
Example
@apply_kalman_filter()
def read_sensor(self):
return self._sensor.read()
# Normal call
value = sensor.read_sensor()
# With higher measurement uncertainty
value = sensor.read_sensor(kf_measurement_variance=2.0)
# Disable outlier protection for this reading
value = sensor.read_sensor(kf_outlier_protection=False)
# Specify exact time delta (useful for testing)
value = sensor.read_sensor(kf_dt=0.1)
Accessing the Filter Instance
The Kalman filter instance is accessible via the wrapper for advanced use cases:
@apply_kalman_filter(process_variance=0.5)
def read_temperature(self):
return self._raw_sensor.read()
# After calling the method at least once
temp = sensor.read_temperature()
# Access the filter
kf = sensor.read_temperature._kalman_filter
# Read filter state
print(f"Current estimate: {kf.estimate}")
print(f"Error estimate: {kf.error_estimate}")
print(f"Kalman gain: {kf.kalman_gain}")
# Modify filter settings
kf.set_process_variance(1.0)
kf.set_outlier_threshold(3)
# Disable filter (pass-through mode)
kf.enabled = False
Parameter Tuning
Process Variance
The process_variance parameter controls how much the filter trusts new measurements versus its previous estimate:
| Value | Effect |
|---|---|
| Lower (0.1) | More smoothing, slower response to real changes |
| Default (0.5) | Balanced smoothing and responsiveness |
| Higher (1.0+) | Less smoothing, faster response to changes |
# Heavy smoothing for very noisy sensor
@apply_kalman_filter(process_variance=0.1)
def read_noisy_sensor(self):
return self._sensor.read()
# Light smoothing for cleaner sensor
@apply_kalman_filter(process_variance=1.0)
def read_clean_sensor(self):
return self._sensor.read()
Outlier Protection
Outlier protection prevents sudden spikes from significantly affecting the estimate:
# Strict outlier detection (more conservative)
@apply_kalman_filter(
outlier_protection=True,
outlier_threshold=3, # Flag values 3x error estimate away
outlier_variance_multiplier=50 # Heavily discount outliers
)
def read_sensor(self):
return self._sensor.read()
# Relaxed outlier detection
@apply_kalman_filter(
outlier_protection=True,
outlier_threshold=10, # Only flag extreme outliers
outlier_variance_multiplier=10 # Moderate discount
)
def read_sensor(self):
return self._sensor.read()
# Disable outlier protection
@apply_kalman_filter(outlier_protection=False)
def read_sensor(self):
return self._sensor.read()
Integration with pydoover Applications
Basic Application Example
from pydoover.docker import Application
from pydoover.utils import apply_async_kalman_filter
class SensorApplication(Application):
def setup(self):
self.platform = self.get_platform_interface()
@apply_async_kalman_filter(process_variance=0.5)
async def read_temperature(self):
"""Read and filter temperature from analog input 0"""
raw_value = self.platform.get_analog_input(0)
# Convert raw value to temperature
return (raw_value / 4095.0) * 100.0 # 0-100 degrees
@apply_async_kalman_filter(process_variance=0.3)
async def read_pressure(self):
"""Read and filter pressure from analog input 1"""
raw_value = self.platform.get_analog_input(1)
# Convert raw value to pressure
return (raw_value / 4095.0) * 1000.0 # 0-1000 kPa
async def main_loop(self):
while self.running:
temp = await self.read_temperature()
pressure = await self.read_pressure()
self.ui_manager.update_variable("temperature", temp)
self.ui_manager.update_variable("pressure", pressure)
await asyncio.sleep(0.1)
Combined with PID Control
from pydoover.docker import Application
from pydoover.utils import apply_async_kalman_filter, PID
class TemperatureController(Application):
def setup(self):
self.platform = self.get_platform_interface()
self.pid = PID(
Kp=2.0, Ki=0.1, Kd=0.05,
setpoint=25.0,
output_limits=(0, 100)
)
@apply_async_kalman_filter(process_variance=0.5)
async def read_temperature(self):
"""Filtered temperature reading"""
raw = self.platform.get_analog_input(0)
return self.convert_to_celsius(raw)
async def main_loop(self):
while self.running:
# Get filtered temperature
temp = await self.read_temperature()
# PID control with clean input
power = self.pid.update(temp)
self.platform.set_analog_output(0, power)
await asyncio.sleep(0.1)
Multiple Sensors
Each decorated method gets its own independent Kalman filter instance:
class MultiSensor:
@apply_kalman_filter(process_variance=0.5)
def read_sensor_1(self):
return self._read_raw(1)
@apply_kalman_filter(process_variance=0.3)
def read_sensor_2(self):
return self._read_raw(2)
@apply_kalman_filter(process_variance=1.0)
def read_sensor_3(self):
return self._read_raw(3)
# Each sensor has independent filtering
sensor = MultiSensor()
val1 = sensor.read_sensor_1() # Uses filter with variance 0.5
val2 = sensor.read_sensor_2() # Uses filter with variance 0.3
val3 = sensor.read_sensor_3() # Uses filter with variance 1.0
Debugging
Enable debug logging to see filter internals:
import logging
# Enable debug logging for kalman module
logging.getLogger("pydoover.utils.kalman").setLevel(logging.DEBUG)
# Or access filter directly
@apply_kalman_filter()
def read_sensor(self):
return self._sensor.read()
# Enable debug mode on filter instance
sensor.read_sensor() # Initialize filter
sensor.read_sensor._kalman_filter.debug = True
# Now filter will log detailed information
sensor.read_sensor() # Logs: "Measurement: X, Estimate: Y, Error estimate: Z"
Filter State Management
Resetting the Filter
To reset a filter to its initial state:
# Access the filter and reset sensor.read_sensor._kalman_filter.estimate = None sensor.read_sensor._kalman_filter.error_estimate = None sensor.read_sensor._kalman_filter.last_timestamp = None
Seeding with Known Value
If you know the approximate starting value:
@apply_kalman_filter(
initial_estimate=25.0, # Expected starting temperature
initial_error_estimate=1.0 # Confidence in initial estimate
)
def read_temperature(self):
return self._sensor.read()
Comparison: Raw vs Filtered
import time
from pydoover.utils import apply_kalman_filter
class SensorDemo:
def __init__(self):
self.raw_values = []
self.filtered_values = []
def read_raw(self):
"""Simulated noisy sensor"""
import random
return 25.0 + random.gauss(0, 2) # 25 degrees with noise
@apply_kalman_filter(process_variance=0.5)
def read_filtered(self):
return self.read_raw()
def demo(self, n=50):
for _ in range(n):
raw = self.read_raw()
filtered = self.read_filtered()
self.raw_values.append(raw)
self.filtered_values.append(filtered)
print(f"Raw: {raw:.2f}, Filtered: {filtered:.2f}")
time.sleep(0.1)
# Run demo
demo = SensorDemo()
demo.demo()
Related Pages
- Utilities Overview - Overview of all utility functions
- PID Controller - Use filtered data with PID control
- Platform Interface - Reading analog inputs