Skip to content

Client Class Reference

The Client class is the main interface for interacting with the Doover Cloud API. It handles authentication, token management, and provides methods for all API operations.

Constructor

from pydoover.cloud.api import Client

client = Client(
    username: str = None,
    password: str = None,
    token: str = None,
    token_expires: datetime = None,
    base_url: str = "https://my.doover.dev",
    agent_id: str = None,
    verify: bool = True,
    login_callback: Callable = None,
    config_profile: str = "default",
    debug: bool = False,
)

Parameters

ParameterTypeRequiredDescription
usernamestrNoUsername (email) for authentication
passwordstrNoPassword for authentication
tokenstrNoPre-existing access token
token_expiresdatetimeNoToken expiration time (UTC)
base_urlstrNoAPI base URL. Default: https://my.doover.dev
agent_idstrNoDefault agent ID for operations
verifyboolNoVerify SSL certificates. Default: True
login_callbackCallableNoCallback function invoked after successful login
config_profilestrNoConfiguration profile name. Default: "default"
debugboolNoEnable debug logging. Default: False
Information Circle
Authentication Priority

If neither username/password nor token is provided, the client will attempt to load credentials from the stored configuration profile.

Properties

PropertyTypeDescription
access_tokenAccessTokenNamed tuple containing token and expires_at
agent_idstrCurrent agent ID
base_urlstrAPI base URL
sessionrequests.SessionHTTP session for requests
request_retriesintNumber of retries for GET requests (default: 1)
request_timeoutintRequest timeout in seconds (default: 59)

Authentication Methods

login

Authenticates with the Doover API using stored credentials.

client.login()

This method:

  1. Calls fetch_token() to get a new access token
  2. Updates the session headers with the new token
  3. Invokes the login_callback if provided

fetch_token

Fetches a temporary token from the Doover API.

token, expires_at, agent_id = client.fetch_token()

Returns: Tuple of (token: str, expires_at: datetime, agent_id: str)

Raises: RuntimeError if username/password are not set

Warning
Two-Factor Authentication

If your account has 2FA enabled, you will be prompted to enter your 2FA code during login. For automated scripts, use a long-lived token instead.

Agent Methods

get_agent

Fetches an agent by its unique identifier.

agent = client.get_agent(agent_id: str) -> Agent

Parameters

ParameterTypeRequiredDescription
agent_idstrYesThe unique identifier (UUID) for the agent

Returns: Agent object

Raises: NotFound if the agent does not exist or you don't have permission

Example

agent = client.get_agent("9fb5d629-ce7f-4b08-b17a-c267cbcd0427")
print(f"Agent name: {agent.name}")
print(f"Agent type: {agent.type}")

get_agent_list

Fetches all agents you have access to.

agents = client.get_agent_list() -> list[Agent]

Returns: List of Agent objects

Example

agents = client.get_agent_list()
for agent in agents:
    print(f"{agent.name}: {agent.key}")

Channel Methods

get_channel

Fetches a channel by its unique identifier.

channel = client.get_channel(channel_id: str) -> Channel | Processor | Task

Parameters

ParameterTypeRequiredDescription
channel_idstrYesThe unique identifier (UUID) for the channel

Returns: Channel, Processor, or Task depending on the channel type

Example

channel = client.get_channel("d1c7e8e3-f47b-4c68-86d7-65054d9e97d3")
print(f"Channel: {channel.name}")
print(f"Aggregate: {channel.aggregate}")

get_channel_named

Fetches a channel by its name for a specific agent.

channel = client.get_channel_named(
    channel_name: str,
    agent_id: str
) -> Channel | Processor | Task

Parameters

ParameterTypeRequiredDescription
channel_namestrYesThe channel name
agent_idstrYesThe owner agent's unique identifier

Returns: Channel, Processor, or Task depending on the channel type

Example

channel = client.get_channel_named("status", "9fb5d629-ce7f-4b08-b17a-c267cbcd0427")

create_channel

Creates a new channel or returns an existing one.

channel = client.create_channel(
    channel_name: str,
    agent_id: str
) -> Channel | Processor | Task

Parameters

ParameterTypeRequiredDescription
channel_namestrYesName for the channel. Use # prefix for processors, ! prefix for tasks
agent_idstrYesThe owner agent's unique identifier

Returns: Channel, Processor, or Task depending on the name prefix

Example

# Create a regular channel
channel = client.create_channel("sensor-data", agent.key)

# Create a processor channel
processor = client.create_channel("#my-processor", agent.key)

# Create a task channel (requires existing processor)
task = client.create_task("my-task", agent.key, processor.key)

create_processor

Creates a processor channel.

processor = client.create_processor(
    processor_name: str,
    agent_id: str
) -> Processor

Parameters

ParameterTypeRequiredDescription
processor_namestrYesName for the processor (with or without # prefix)
agent_idstrYesThe owner agent's unique identifier

Returns: Processor object

create_task

Creates a task channel associated with a processor.

task = client.create_task(
    task_name: str,
    agent_id: str,
    processor_id: str
) -> Task

Parameters

ParameterTypeRequiredDescription
task_namestrYesName for the task (with or without ! prefix)
agent_idstrYesThe owner agent's unique identifier
processor_idstrYesThe processor to associate with this task

Returns: Task object

publish_to_channel

Publishes data to a channel by ID.

client.publish_to_channel(
    channel_id: str,
    data: Any,
    save_log: bool = True,
    log_aggregate: bool = False,
    override_aggregate: bool = False,
    timestamp: datetime = None
)

Parameters

ParameterTypeRequiredDescription
channel_idstrYesThe channel's unique identifier
dataAnyYesData to publish (dict or string)
save_logboolNoSave to message log. Default: True
log_aggregateboolNoLog the aggregate. Default: False
override_aggregateboolNoReplace existing aggregate. Default: False
timestampdatetimeNoCustom timestamp. Default: current time

Example

client.publish_to_channel(
    channel_id="d1c7e8e3-f47b-4c68-86d7-65054d9e97d3",
    data={"temperature": 25.5, "humidity": 60},
    save_log=True
)

publish_to_channel_name

Publishes data to a channel by name.

client.publish_to_channel_name(
    agent_id: str,
    channel_name: str,
    data: Any,
    save_log: bool = True,
    log_aggregate: bool = False,
    override_aggregate: bool = False,
    timestamp: datetime = None
)

Parameters

ParameterTypeRequiredDescription
agent_idstrYesThe agent ID who owns the channel
channel_namestrYesThe channel name
dataAnyYesData to publish (dict or string)
save_logboolNoSave to message log. Default: True
log_aggregateboolNoLog the aggregate. Default: False
override_aggregateboolNoReplace existing aggregate. Default: False
timestampdatetimeNoCustom timestamp. Default: current time

Message Methods

get_channel_messages

Fetches messages from a channel.

messages = client.get_channel_messages(
    channel_id: str,
    num_messages: int = None
) -> list[Message]

Parameters

ParameterTypeRequiredDescription
channel_idstrYesThe channel's unique identifier
num_messagesintNoNumber of messages to fetch. Default: all

Returns: List of Message objects

Example

messages = client.get_channel_messages("channel-uuid", num_messages=10)
for msg in messages:
    print(f"[{msg.timestamp}] {msg.fetch_payload()}")

get_channel_messages_in_window

Fetches messages within a specific time window.

messages = client.get_channel_messages_in_window(
    channel_id: str,
    start: datetime,
    end: datetime
) -> list[Message]

Parameters

ParameterTypeRequiredDescription
channel_idstrYesThe channel's unique identifier
startdatetimeYesStart of the time window
enddatetimeYesEnd of the time window

Returns: List of Message objects

Example

from datetime import datetime, timedelta

end = datetime.now()
start = end - timedelta(hours=24)

messages = client.get_channel_messages_in_window("channel-uuid", start, end)
print(f"Found {len(messages)} messages in the last 24 hours")

get_message

Fetches a specific message by ID.

message = client.get_message(
    channel_id: str,
    message_id: str
) -> Message

Parameters

ParameterTypeRequiredDescription
channel_idstrYesThe channel's unique identifier
message_idstrYesThe message's unique identifier

Returns: Message object

Raises: NotFound if the message does not exist

Subscription Methods

subscribe_to_channel

Subscribes a task to a channel.

result = client.subscribe_to_channel(
    channel_id: str,
    task_id: str
) -> bool

Parameters

ParameterTypeRequiredDescription
channel_idstrYesThe channel to subscribe to
task_idstrYesThe task to subscribe

Returns: True if successful

unsubscribe_from_channel

Unsubscribes a task from a channel.

result = client.unsubscribe_from_channel(
    channel_id: str,
    task_id: str
) -> bool

Parameters

ParameterTypeRequiredDescription
channel_idstrYesThe channel to unsubscribe from
task_idstrYesThe task to unsubscribe

Returns: True if successful

Tunnel Methods

get_tunnels

Gets all tunnels for an agent.

tunnels = client.get_tunnels(
    agent_id: str,
    show_choices: bool = False
)

get_tunnel

Gets a specific tunnel by ID.

tunnel = client.get_tunnel(tunnel_id: str)

create_tunnel

Creates a new tunnel for an agent.

tunnel = client.create_tunnel(agent_id: str, **data)

activate_tunnel

Activates a tunnel.

client.activate_tunnel(tunnel_id: str)

deactivate_tunnel

Deactivates a tunnel.

client.deactivate_tunnel(tunnel_id: str)

delete_tunnel

Deletes a tunnel.

client.delete_tunnel(tunnel_id: str)

patch_tunnel

Updates tunnel configuration.

client.patch_tunnel(tunnel_id: str, **data)

Application Methods

get_applications

Gets all available applications.

apps = client.get_applications()

get_application

Gets a specific application by key.

app = client.get_application(key: str) -> Application

create_application

Creates a new application.

key = client.create_application(
    application: Application,
    is_staging: bool = False
) -> str

Returns: The created application's key

update_application

Updates an existing application.

client.update_application(
    application: Application,
    is_staging: bool = False
)

Complete Example

from pydoover.cloud.api import Client, NotFound
from datetime import datetime, timedelta

# Initialize client
client = Client(config_profile="production")

# List all agents
agents = client.get_agent_list()
print(f"Found {len(agents)} agents")

# Work with a specific agent
agent = agents[0]
print(f"Working with agent: {agent.name}")

# Create or get a channel
channel = client.create_channel("sensor-readings", agent.key)

# Publish some data
client.publish_to_channel(
    channel.key,
    data={
        "temperature": 23.5,
        "humidity": 45,
        "timestamp": datetime.now().isoformat()
    }
)

# Read back recent messages
messages = client.get_channel_messages(channel.key, num_messages=5)
for msg in messages:
    print(f"Message at {msg.timestamp}: {msg.fetch_payload()}")

# Get messages from the last hour
end = datetime.now()
start = end - timedelta(hours=1)
recent = client.get_channel_messages_in_window(channel.key, start, end)
print(f"Found {len(recent)} messages in the last hour")