Camera Interface
The camera interface (camera_iface) is a device bridge service that provides RTSP camera support for Doover devices. It enables applications to capture snapshots, stream video, and integrate with IP cameras.
Overview
The camera_iface service runs as a Docker container and exposes a gRPC service on port 50050. It supports:
- RTSP camera connections
- Snapshot capture
- Dahua camera integration
- Camera power management
Service Configuration
The camera interface container is configured through environment variables:
| Variable | Description | Default |
|---|---|---|
| CAMERA_PORT | gRPC service port | 50050 |
| CAMERA_RTSP_URL | Default RTSP stream URL | None |
| CAMERA_USERNAME | Camera authentication username | None |
| CAMERA_PASSWORD | Camera authentication password | None |
RTSP Camera Support
The camera interface supports standard RTSP cameras. Most IP cameras provide an RTSP stream URL that follows this pattern:
rtsp://username:password@camera-ip:554/stream
The exact path varies by camera manufacturer. Common formats include:
| Manufacturer | RTSP URL Format |
|---|---|
| Dahua | rtsp://user:pass@ip:554/cam/realmonitor?channel=1&subtype=0 |
| Hikvision | rtsp://user:pass@ip:554/Streaming/Channels/101 |
| Axis | rtsp://user:pass@ip:554/axis-media/media.amp |
| Generic | rtsp://user:pass@ip:554/stream |
Snapshot Capture
The primary use case for the camera interface is capturing snapshots from RTSP streams. Snapshots can be triggered programmatically and uploaded to the Doover cloud.
Basic Snapshot Example
from pydoover.docker import Application, run_app
from pydoover.config import Schema, String
class CameraConfig(Schema):
def __init__(self):
self.camera_url = String(
"Camera URL",
default="rtsp://admin:password@192.168.1.100:554/stream"
)
self.snapshot_interval = Integer(
"Snapshot Interval (seconds)",
default=300,
min_val=60
)
class CameraApp(Application):
config: CameraConfig
def setup(self):
self.last_snapshot = 0
def main_loop(self):
import time
current_time = time.time()
# Capture snapshot at configured interval
if current_time - self.last_snapshot >= self.config.snapshot_interval.value:
self.capture_and_upload_snapshot()
self.last_snapshot = current_time
def capture_and_upload_snapshot(self):
# Request snapshot from camera interface
# Implementation depends on your specific camera setup
print(f"Capturing snapshot from {self.config.camera_url.value}")
if __name__ == "__main__":
run_app(CameraApp(config=CameraConfig()))
Dahua Camera Integration
The camera interface includes specific support for Dahua cameras, which are commonly used in industrial and commercial applications.
Dahua Features
- PTZ Control - Pan, tilt, and zoom control for PTZ cameras
- Event Integration - Motion detection and alarm event handling
- Multi-Channel - Support for multi-channel NVRs
- Snapshot API - Direct snapshot capture without RTSP streaming
Dahua RTSP URL Format
Dahua cameras use a specific RTSP URL format:
rtsp://username:password@camera-ip:554/cam/realmonitor?channel=1&subtype=0
Parameters:
channel- Camera channel (1 for single-channel cameras)subtype- Stream type (0 for main stream, 1 for sub stream)
Dahua Configuration Example
class DahuaCameraConfig(Schema):
def __init__(self):
self.camera_ip = String("Camera IP", default="192.168.1.100")
self.camera_user = String("Username", default="admin")
self.camera_pass = String("Password", default="")
self.channel = Integer("Channel", default=1, min_val=1)
self.stream_type = Enum(
"Stream Type",
options=["Main Stream", "Sub Stream"],
default="Sub Stream"
)
def get_rtsp_url(self):
subtype = 0 if self.stream_type.value == "Main Stream" else 1
return (
f"rtsp://{self.camera_user.value}:{self.camera_pass.value}"
f"@{self.camera_ip.value}:554/cam/realmonitor"
f"?channel={self.channel.value}&subtype={subtype}"
)
Power Management
For PoE (Power over Ethernet) cameras, the camera interface can manage camera power through compatible PoE switches or relay-controlled power.
Power Control Use Cases
- Scheduled Operation - Power cameras on only during business hours
- On-Demand Capture - Power on camera, capture snapshot, power off
- Power Cycling - Reset unresponsive cameras
Power Control Example
class PowerManagedCameraApp(Application):
def setup(self):
# Camera is powered via relay on DO0
self.camera_power_pin = 0
# Ensure camera is powered on
self.platform_iface.set_do(self.camera_power_pin, True)
def capture_with_power_management(self):
# Power on camera
self.platform_iface.set_do(self.camera_power_pin, True)
# Wait for camera to boot (adjust based on your camera)
import time
time.sleep(30)
# Capture snapshot
self.capture_snapshot()
# Schedule power off in 60 seconds
self.platform_iface.schedule_do(self.camera_power_pin, False, 60)
def capture_snapshot(self):
# Snapshot capture implementation
pass
Network Configuration
Camera Network Requirements
- Camera must be accessible from the Doover device
- RTSP port (typically 554) must be open
- Sufficient bandwidth for video streaming
Common Network Topologies
Direct Connection
Doover Device --- Ethernet --- IP Camera
Switched Network
Doover Device --- Switch --- IP Camera 1
|
+------- IP Camera 2
With PoE Switch
Doover Device --- PoE Switch --- PoE Camera
|
+---------- Power + Data
Troubleshooting
Camera Not Connecting
- Verify network connectivity - Ping the camera IP from the device
- Check RTSP URL - Test URL with VLC or ffmpeg
- Verify credentials - Confirm username and password
- Check firewall - Ensure RTSP port is not blocked
Testing RTSP Stream
From a device shell:
# Test RTSP stream with ffmpeg ffmpeg -i "rtsp://admin:password@192.168.1.100:554/stream" -frames:v 1 test.jpg # Check if camera is responding curl -I http://192.168.1.100
Snapshot Quality Issues
- Use the main stream (subtype=0) for higher quality snapshots
- Check camera resolution settings
- Verify network bandwidth is sufficient
Camera Connection Timeouts
- Increase connection timeout in camera configuration
- Check for network congestion
- Verify camera is not overloaded with connections
Best Practices
Security
- Use strong passwords for camera authentication
- Isolate cameras on a separate VLAN if possible
- Disable unnecessary camera services (FTP, SMTP, etc.)
- Keep camera firmware updated
Reliability
- Use sub-stream for frequent monitoring, main stream for snapshots
- Implement retry logic for failed captures
- Monitor camera health and connection status
- Consider redundant cameras for critical applications
Performance
- Limit concurrent camera connections
- Use appropriate resolution for your needs
- Consider snapshot interval based on bandwidth
- Close streams when not actively capturing
Integration Example
Here is a complete example integrating camera capture with other Doover features:
from pydoover.docker import Application, run_app
from pydoover.config import Schema, String, Integer, Boolean
import time
class SurveillanceConfig(Schema):
def __init__(self):
self.camera_url = String(
"Camera RTSP URL",
default="rtsp://admin:password@192.168.1.100:554/stream"
)
self.snapshot_interval = Integer(
"Snapshot Interval (minutes)",
default=5,
min_val=1
)
self.motion_enabled = Boolean(
"Enable Motion Trigger",
default=True
)
self.motion_input = Integer(
"Motion Sensor DI Pin",
default=0,
min_val=0
)
class SurveillanceApp(Application):
config: SurveillanceConfig
def setup(self):
self.last_scheduled_capture = 0
self.last_motion_capture = 0
self.motion_cooldown = 60 # Minimum seconds between motion captures
# Set up motion detection
if self.config.motion_enabled.value:
self.motion_counter = self.platform_iface.get_new_pulse_counter(
di=self.config.motion_input.value,
edge="rising",
callback=self.on_motion_detected
)
async def on_motion_detected(self, pin, value, dt_secs, count, edge):
"""Handle motion detection events."""
current_time = time.time()
# Check cooldown period
if current_time - self.last_motion_capture < self.motion_cooldown:
return
print(f"Motion detected on pin {pin}")
self.capture_snapshot("motion")
self.last_motion_capture = current_time
# Publish motion event
self.device_agent.publish_to_channel("events", {
"type": "motion",
"timestamp": current_time,
"sensor": pin
})
def main_loop(self):
current_time = time.time()
interval_seconds = self.config.snapshot_interval.value * 60
# Scheduled snapshot capture
if current_time - self.last_scheduled_capture >= interval_seconds:
self.capture_snapshot("scheduled")
self.last_scheduled_capture = current_time
# Publish camera status
self.set_tag("camera_online", self.check_camera_status())
def capture_snapshot(self, trigger_type):
"""Capture and upload a snapshot."""
print(f"Capturing {trigger_type} snapshot from {self.config.camera_url.value}")
# Implementation would capture from RTSP and upload to cloud
# This depends on your specific requirements
self.device_agent.publish_to_channel("snapshots", {
"trigger": trigger_type,
"timestamp": time.time(),
"camera": self.config.camera_url.value
})
def check_camera_status(self):
"""Check if camera is responding."""
# Implementation would test camera connectivity
return True
if __name__ == "__main__":
run_app(SurveillanceApp(config=SurveillanceConfig()))
Next Steps
- Device Bridges Overview - Bridge architecture
- Device Services - DDS service details
- PlatformInterface - Hardware I/O for power control
- DeviceAgentInterface - Cloud publishing for snapshots