Skip to content

Camera Interface

The camera interface (camera_iface) is a device bridge service that provides RTSP camera support for Doover devices. It enables applications to capture snapshots, stream video, and integrate with IP cameras.

Overview

The camera_iface service runs as a Docker container and exposes a gRPC service on port 50050. It supports:

  • RTSP camera connections
  • Snapshot capture
  • Dahua camera integration
  • Camera power management

Service Configuration

The camera interface container is configured through environment variables:

VariableDescriptionDefault
CAMERA_PORTgRPC service port50050
CAMERA_RTSP_URLDefault RTSP stream URLNone
CAMERA_USERNAMECamera authentication usernameNone
CAMERA_PASSWORDCamera authentication passwordNone

RTSP Camera Support

The camera interface supports standard RTSP cameras. Most IP cameras provide an RTSP stream URL that follows this pattern:

rtsp://username:password@camera-ip:554/stream

The exact path varies by camera manufacturer. Common formats include:

ManufacturerRTSP URL Format
Dahuartsp://user:pass@ip:554/cam/realmonitor?channel=1&subtype=0
Hikvisionrtsp://user:pass@ip:554/Streaming/Channels/101
Axisrtsp://user:pass@ip:554/axis-media/media.amp
Genericrtsp://user:pass@ip:554/stream

Snapshot Capture

The primary use case for the camera interface is capturing snapshots from RTSP streams. Snapshots can be triggered programmatically and uploaded to the Doover cloud.

Basic Snapshot Example

from pydoover.docker import Application, run_app
from pydoover.config import Schema, String

class CameraConfig(Schema):
    def __init__(self):
        self.camera_url = String(
            "Camera URL",
            default="rtsp://admin:password@192.168.1.100:554/stream"
        )
        self.snapshot_interval = Integer(
            "Snapshot Interval (seconds)",
            default=300,
            min_val=60
        )

class CameraApp(Application):
    config: CameraConfig

    def setup(self):
        self.last_snapshot = 0

    def main_loop(self):
        import time
        current_time = time.time()

        # Capture snapshot at configured interval
        if current_time - self.last_snapshot >= self.config.snapshot_interval.value:
            self.capture_and_upload_snapshot()
            self.last_snapshot = current_time

    def capture_and_upload_snapshot(self):
        # Request snapshot from camera interface
        # Implementation depends on your specific camera setup
        print(f"Capturing snapshot from {self.config.camera_url.value}")

if __name__ == "__main__":
    run_app(CameraApp(config=CameraConfig()))

Dahua Camera Integration

The camera interface includes specific support for Dahua cameras, which are commonly used in industrial and commercial applications.

Dahua Features

  • PTZ Control - Pan, tilt, and zoom control for PTZ cameras
  • Event Integration - Motion detection and alarm event handling
  • Multi-Channel - Support for multi-channel NVRs
  • Snapshot API - Direct snapshot capture without RTSP streaming

Dahua RTSP URL Format

Dahua cameras use a specific RTSP URL format:

rtsp://username:password@camera-ip:554/cam/realmonitor?channel=1&subtype=0

Parameters:

  • channel - Camera channel (1 for single-channel cameras)
  • subtype - Stream type (0 for main stream, 1 for sub stream)

Dahua Configuration Example

class DahuaCameraConfig(Schema):
    def __init__(self):
        self.camera_ip = String("Camera IP", default="192.168.1.100")
        self.camera_user = String("Username", default="admin")
        self.camera_pass = String("Password", default="")
        self.channel = Integer("Channel", default=1, min_val=1)
        self.stream_type = Enum(
            "Stream Type",
            options=["Main Stream", "Sub Stream"],
            default="Sub Stream"
        )

    def get_rtsp_url(self):
        subtype = 0 if self.stream_type.value == "Main Stream" else 1
        return (
            f"rtsp://{self.camera_user.value}:{self.camera_pass.value}"
            f"@{self.camera_ip.value}:554/cam/realmonitor"
            f"?channel={self.channel.value}&subtype={subtype}"
        )

Power Management

For PoE (Power over Ethernet) cameras, the camera interface can manage camera power through compatible PoE switches or relay-controlled power.

Power Control Use Cases

  • Scheduled Operation - Power cameras on only during business hours
  • On-Demand Capture - Power on camera, capture snapshot, power off
  • Power Cycling - Reset unresponsive cameras

Power Control Example

class PowerManagedCameraApp(Application):
    def setup(self):
        # Camera is powered via relay on DO0
        self.camera_power_pin = 0

        # Ensure camera is powered on
        self.platform_iface.set_do(self.camera_power_pin, True)

    def capture_with_power_management(self):
        # Power on camera
        self.platform_iface.set_do(self.camera_power_pin, True)

        # Wait for camera to boot (adjust based on your camera)
        import time
        time.sleep(30)

        # Capture snapshot
        self.capture_snapshot()

        # Schedule power off in 60 seconds
        self.platform_iface.schedule_do(self.camera_power_pin, False, 60)

    def capture_snapshot(self):
        # Snapshot capture implementation
        pass

Network Configuration

Camera Network Requirements

  • Camera must be accessible from the Doover device
  • RTSP port (typically 554) must be open
  • Sufficient bandwidth for video streaming

Common Network Topologies

Direct Connection

Doover Device --- Ethernet --- IP Camera

Switched Network

Doover Device --- Switch --- IP Camera 1
                    |
                    +------- IP Camera 2

With PoE Switch

Doover Device --- PoE Switch --- PoE Camera
                     |
                     +---------- Power + Data

Troubleshooting

Camera Not Connecting

  1. Verify network connectivity - Ping the camera IP from the device
  2. Check RTSP URL - Test URL with VLC or ffmpeg
  3. Verify credentials - Confirm username and password
  4. Check firewall - Ensure RTSP port is not blocked

Testing RTSP Stream

From a device shell:

# Test RTSP stream with ffmpeg
ffmpeg -i "rtsp://admin:password@192.168.1.100:554/stream" -frames:v 1 test.jpg

# Check if camera is responding
curl -I http://192.168.1.100

Snapshot Quality Issues

  • Use the main stream (subtype=0) for higher quality snapshots
  • Check camera resolution settings
  • Verify network bandwidth is sufficient

Camera Connection Timeouts

  • Increase connection timeout in camera configuration
  • Check for network congestion
  • Verify camera is not overloaded with connections

Best Practices

Security

  • Use strong passwords for camera authentication
  • Isolate cameras on a separate VLAN if possible
  • Disable unnecessary camera services (FTP, SMTP, etc.)
  • Keep camera firmware updated

Reliability

  • Use sub-stream for frequent monitoring, main stream for snapshots
  • Implement retry logic for failed captures
  • Monitor camera health and connection status
  • Consider redundant cameras for critical applications

Performance

  • Limit concurrent camera connections
  • Use appropriate resolution for your needs
  • Consider snapshot interval based on bandwidth
  • Close streams when not actively capturing

Integration Example

Here is a complete example integrating camera capture with other Doover features:

from pydoover.docker import Application, run_app
from pydoover.config import Schema, String, Integer, Boolean
import time

class SurveillanceConfig(Schema):
    def __init__(self):
        self.camera_url = String(
            "Camera RTSP URL",
            default="rtsp://admin:password@192.168.1.100:554/stream"
        )
        self.snapshot_interval = Integer(
            "Snapshot Interval (minutes)",
            default=5,
            min_val=1
        )
        self.motion_enabled = Boolean(
            "Enable Motion Trigger",
            default=True
        )
        self.motion_input = Integer(
            "Motion Sensor DI Pin",
            default=0,
            min_val=0
        )

class SurveillanceApp(Application):
    config: SurveillanceConfig

    def setup(self):
        self.last_scheduled_capture = 0
        self.last_motion_capture = 0
        self.motion_cooldown = 60  # Minimum seconds between motion captures

        # Set up motion detection
        if self.config.motion_enabled.value:
            self.motion_counter = self.platform_iface.get_new_pulse_counter(
                di=self.config.motion_input.value,
                edge="rising",
                callback=self.on_motion_detected
            )

    async def on_motion_detected(self, pin, value, dt_secs, count, edge):
        """Handle motion detection events."""
        current_time = time.time()

        # Check cooldown period
        if current_time - self.last_motion_capture < self.motion_cooldown:
            return

        print(f"Motion detected on pin {pin}")
        self.capture_snapshot("motion")
        self.last_motion_capture = current_time

        # Publish motion event
        self.device_agent.publish_to_channel("events", {
            "type": "motion",
            "timestamp": current_time,
            "sensor": pin
        })

    def main_loop(self):
        current_time = time.time()
        interval_seconds = self.config.snapshot_interval.value * 60

        # Scheduled snapshot capture
        if current_time - self.last_scheduled_capture >= interval_seconds:
            self.capture_snapshot("scheduled")
            self.last_scheduled_capture = current_time

        # Publish camera status
        self.set_tag("camera_online", self.check_camera_status())

    def capture_snapshot(self, trigger_type):
        """Capture and upload a snapshot."""
        print(f"Capturing {trigger_type} snapshot from {self.config.camera_url.value}")

        # Implementation would capture from RTSP and upload to cloud
        # This depends on your specific requirements

        self.device_agent.publish_to_channel("snapshots", {
            "trigger": trigger_type,
            "timestamp": time.time(),
            "camera": self.config.camera_url.value
        })

    def check_camera_status(self):
        """Check if camera is responding."""
        # Implementation would test camera connectivity
        return True

if __name__ == "__main__":
    run_app(SurveillanceApp(config=SurveillanceConfig()))

Next Steps