Skip to content

Application Lifecycle

Every pydoover application follows a defined lifecycle that ensures proper initialization, continuous operation, and graceful shutdown. Understanding this lifecycle is essential for building robust applications.

Lifecycle Overview

The application lifecycle consists of four main phases:

  1. Initialization - Application instantiation and configuration
  2. Setup - One-time initialization before the main loop
  3. Main Loop - Continuous execution of application logic
  4. Shutdown - Graceful cleanup when the application exits

Starting an Application

Applications are started using the run_app() function:

from pydoover.docker import Application, run_app
from pydoover.config import Schema

class MyApp(Application):
    def setup(self):
        pass

    def main_loop(self):
        pass

if __name__ == "__main__":
    run_app(MyApp(config=Schema()))

The run_app Function

The run_app() function handles all application bootstrapping:

def run_app(
    app: Application,
    start: bool = True,
    setup_logging: bool = True,
    log_formatter: logging.Formatter = None,
    log_filters: logging.Filter | list[logging.Filter] = None,
)
ParameterTypeDescription
appApplicationThe application instance to run
startboolIf True, runs in blocking mode. If False, returns an async runner
setup_loggingboolWhether to configure logging automatically
log_formatterlogging.FormatterCustom log formatter (optional)
log_filterslogging.FilterCustom log filters (optional)

When run_app() is called, it:

  1. Parses command-line arguments and environment variables
  2. Determines if the application uses async methods
  3. Configures logging
  4. Sets up URIs for all gRPC interfaces
  5. Starts the async event loop
  6. Enters the application lifecycle

Phase 1: Initialization

During initialization, the Application constructor sets up the internal state:

def __init__(
    self,
    config: Schema,
    app_key: str = None,
    is_async: bool = None,
    device_agent: DeviceAgentInterface = None,
    platform_iface: PlatformInterface = None,
    modbus_iface: ModbusInterface = None,
    name: str = None,
    test_mode: bool = False,
    config_fp: str = None,
    healthcheck_port: int = None,
):

Key initialization tasks:

  • Create gRPC interface instances (DeviceAgent, Platform, Modbus)
  • Initialize the UI Manager
  • Set up internal state for tags and subscriptions
  • Configure the loop timing parameters

Configuration Properties

PropertyDefaultDescription
loop_target_period1 secondTarget time between main_loop iterations
dda_startup_timeout300 secondsMaximum wait time for DDA availability

Phase 2: Waiting for DDA

Before setup can proceed, the application waits for the Device Agent to become available:

await self.device_agent.await_dda_available_async(self.dda_startup_timeout)

This ensures cloud connectivity is established before your application logic runs. The wait includes:

  1. Polling the Device Agent via gRPC
  2. Exponential backoff on connection failures
  3. Timeout after dda_startup_timeout seconds

Phase 3: Setup

Once DDA is available, the setup phase begins. This involves both internal setup and your custom setup() method.

Internal Setup

The framework performs several internal setup tasks:

async def _setup(self):
    # Register UI callbacks
    self.ui_manager.register_callbacks(self)
    self.ui_manager.set_display_name(self.app_display_name)

    # Subscribe to tag updates
    self.device_agent.add_subscription(TAG_CHANNEL_NAME, self._on_tag_update)

    # Clear any existing UI
    await self.ui_manager.clear_ui_async()

    # Wait for initial tag values (10 second timeout)
    await asyncio.wait_for(self._tag_ready.wait(), timeout=10.0)

Your Setup Method

After internal setup, your setup() method is called:

class MyApp(Application):
    def setup(self):
        # Initialize UI elements
        self.set_ui_elements([...])

        # Set initial state
        self.set_tag("status", "ready")

        # Start any background tasks
        self.start_monitoring()

Your setup method can be synchronous or asynchronous:

# Synchronous setup
def setup(self):
    self.set_tag("ready", True)

# Asynchronous setup
async def setup(self):
    await self.initialize_sensors()
    self.set_tag("ready", True)

Setup Error Handling

If an error occurs during setup:

  1. The error is logged
  2. The application waits for _error_wait_period seconds (default: 10)
  3. The application restarts

Phase 4: Main Loop

After successful setup, the main loop begins. This is where your application's core logic executes repeatedly.

Main Loop Execution

The main loop runs continuously:

while True:
    # Internal main loop tasks
    await self._main_loop()

    # Your main loop
    await call_maybe_async(self.main_loop)

    # Wait for next iteration
    await self.wait_for_interval(self.loop_target_period)

Your Main Loop Method

Implement your recurring logic in main_loop():

class MyApp(Application):
    def main_loop(self):
        # Read sensors
        temperature = self.platform_iface.get_ai(0)

        # Update state
        self.set_tag("temperature", temperature)

        # Check conditions
        if temperature > 30:
            self.set_do(0, True)  # Turn on cooling

Like setup(), your main_loop() can be synchronous or asynchronous:

async def main_loop(self):
    # Asynchronous operations
    data = await self.fetch_external_data()
    await self.process_data(data)

Loop Timing

The framework maintains consistent loop timing:

  • loop_target_period defines the target interval between iterations
  • If a loop takes longer than the target, the next iteration starts immediately
  • If a loop is consistently slow (>20% over target), a warning is logged
# Set a 2-second loop interval
self.loop_target_period = 2

Main Loop Error Handling

If an error occurs in the main loop:

  1. The error is logged
  2. Health status is set to unhealthy
  3. The application waits for _error_wait_period seconds
  4. The application restarts

Phase 5: Shutdown

Shutdown can be triggered by:

  • Keyboard interrupt (Ctrl+C)
  • Scheduled shutdown from the cloud
  • Application request via request_shutdown()

Shutdown Process

The shutdown process involves:

async def close(self):
    # Close Device Agent connections
    await self.device_agent.close()

    # Close Platform Interface
    await self.platform_iface.close()

    # Close Modbus Interface
    await self.modbus_iface.close()

    # Cancel all async tasks
    for task in asyncio.all_tasks():
        task.cancel()

Scheduled Shutdowns

The cloud can schedule shutdowns via the shutdown_at tag. When a shutdown is scheduled:

  1. The on_shutdown_at() callback is invoked
  2. Your application can perform cleanup
  3. The system shuts down at the scheduled time
class MyApp(Application):
    async def on_shutdown_at(self, dt: datetime):
        # Perform cleanup before shutdown
        self.log.info(f"Shutdown scheduled at {dt}")
        await self.save_state()

Checking Shutdown Readiness

Override check_can_shutdown() to control when shutdown is allowed:

class MyApp(Application):
    async def check_can_shutdown(self) -> bool:
        # Check if it's safe to shut down
        if self.is_processing:
            return False  # Don't shut down while processing
        return True

Application Readiness

The application provides methods to check and wait for readiness:

# Check if application is ready
if app.is_ready:
    print("Application is ready")

# Wait for application to be ready
await app.wait_until_ready()

The application is considered ready when:

  1. Setup has completed successfully
  2. UI communications are synchronized

Test Mode

For testing, applications can run in test mode:

app = MyApp(config=Schema(), test_mode=True)
asyncio.create_task(run_app(app, start=False))

# Wait for app to be ready
await app.wait_until_ready()

# Manually trigger main loop iterations
await app.next()  # Run one iteration
await app.next()  # Run another iteration

In test mode:

  • The main loop does not auto-execute
  • Use app.next() to manually trigger iterations
  • Useful for controlled testing scenarios

Healthcheck Server

Applications include an optional healthcheck server:

  • Runs on the configured healthcheck_port (default: 49200)
  • Returns HTTP 200 when healthy, 503 when unhealthy
  • Health is determined by successful main loop execution
# Configure healthcheck port
app = MyApp(config=Schema(), healthcheck_port=49201)

Lifecycle Summary

PhaseDescriptionYour Code
InitializationCreate app instanceConstructor parameters
Wait for DDAEnsure cloud connectivityNone (automatic)
SetupOne-time initializationsetup() method
Main LoopContinuous executionmain_loop() method
ShutdownGraceful cleanupon_shutdown_at(), check_can_shutdown()