Skip to content

academy.runtime

RuntimeConfig dataclass

RuntimeConfig(
    cancel_actions_on_shutdown: bool = True,
    max_sync_concurrency: int | None = None,
    raise_loop_errors_on_shutdown: bool = True,
    shutdown_on_loop_error: bool = True,
    terminate_on_error: bool = True,
    terminate_on_success: bool = True,
)

Agent runtime configuration.

Attributes:

  • cancel_actions_on_shutdown (bool) –

    Cancel running actions when the agent is shutdown, otherwise wait for the actions to finish.

  • max_sync_concurrency (int | None) –

    Maximum number of concurrent sync tasks allowed via Agent.agent_run_sync(). This is used to set the number of threads in a default ThreadPoolExecutor.

  • raise_loop_errors_on_shutdown (bool) –

    Raise any captured loop errors when the agent is shutdown.

  • shutdown_on_loop_error (bool) –

    Shutdown the agent if any loop raises an error.

  • terminate_on_error (bool) –

    Terminate the agent by closing its mailbox permanently if the agent shuts down due to an error.

  • terminate_on_success (bool) –

    Terminate the agent by closing its mailbox permanently if the agent shuts down without an error.

Runtime

Runtime(
    agent: AgentT,
    *,
    exchange_factory: ExchangeFactory[ExchangeTransportT],
    registration: AgentRegistrationT,
    config: RuntimeConfig | None = None
)

Bases: Generic[AgentT], NoPickleMixin

Agent runtime manager.

The runtime is used to execute an agent by managing stateful resources, startup/shutdown, lifecycle hooks, and concurrency.

An agent can be run in two ways:

runtime = Runtime(agent, ...)

# Option 1: Async context manager
async with runtime:
    ...
    await runtime.wait_shutdown()

# Option 2: Run until complete
await runtime.run_until_complete()

Note

A runtime can only be used once, after which attempts to run an agent using the same runtime with raise a RuntimeError.

Note

If any @loop method raises an error, the agent will be signaled to shutdown if shutdown_on_loop_error is set in the config.

Parameters:

Source code in academy/runtime.py
def __init__(
    self,
    agent: AgentT,
    *,
    exchange_factory: ExchangeFactory[ExchangeTransportT],
    registration: AgentRegistrationT,
    config: RuntimeConfig | None = None,
) -> None:
    self.agent_id = registration.agent_id
    self.agent = agent
    self.factory = exchange_factory
    self.registration = registration
    self.config = config if config is not None else RuntimeConfig()

    self._actions = agent._agent_actions()
    self._loops = agent._agent_loops()

    self._started_event = asyncio.Event()
    self._shutdown_event = asyncio.Event()
    self._shutdown_options = _ShutdownState()
    self._agent_startup_called = False

    self._action_tasks: dict[uuid.UUID, asyncio.Task[None]] = {}
    self._loop_tasks: dict[str, asyncio.Task[None]] = {}
    self._loop_exceptions: list[tuple[str, Exception]] = []

    self._sync_executor = ThreadPoolExecutor(
        self.config.max_sync_concurrency,
        thread_name_prefix='agent-sync-executor-thread',
    )

    self._exchange_client: (
        AgentExchangeClient[AgentT, ExchangeTransportT] | None
    ) = None
    self._exchange_listener_task: asyncio.Task[None] | None = None
    self.exchange_context_token: (
        contextvars.Token[ExchangeClient[Any] | None] | None
    ) = None

action async

action(
    action: str,
    source_id: EntityId,
    *,
    args: Any,
    kwargs: Any
) -> Any

Invoke an action of the agent's agent.

Parameters:

  • action (str) –

    Name of action to invoke.

  • source_id (EntityId) –

    ID of the source that requested the action.

  • args (Any) –

    Tuple of positional arguments.

  • kwargs (Any) –

    Dictionary of keyword arguments.

Returns:

  • Any

    Result of the action.

Raises:

  • AttributeError

    If an action with this name is not implemented by the agent's agent.

Source code in academy/runtime.py
async def action(
    self,
    action: str,
    source_id: EntityId,
    *,
    args: Any,
    kwargs: Any,
) -> Any:
    """Invoke an action of the agent's agent.

    Args:
        action: Name of action to invoke.
        source_id: ID of the source that requested the action.
        args: Tuple of positional arguments.
        kwargs: Dictionary of keyword arguments.

    Returns:
        Result of the action.

    Raises:
        AttributeError: If an action with this name is not implemented by
            the agent's agent.
    """
    logger.debug('Invoking "%s" action on %s', action, self.agent_id)
    if action not in self._actions:
        raise AttributeError(
            f'{self.agent} does not have an action named "{action}".',
        )
    action_method = self._actions[action]
    if action_method._action_method_context:
        assert self._exchange_client is not None
        context = ActionContext(source_id, self._exchange_client)
        return await action_method(*args, context=context, **kwargs)
    else:
        return await action_method(*args, **kwargs)

run_until_complete async

run_until_complete() -> None

Run the agent until shutdown.

Agent startup involves:

  1. Creates a new exchange client for the agent.
  2. Sets the runtime context on the agent.
  3. Binds all handles of the agent to this agent's exchange client.
  4. Schedules a Task to listen for messages in the agent's mailbox in the exchange. Agent requests will not start processing until the end of the startup sequence.
  5. Schedules a Task for all control loops defined on the agent. Each task will block until the end of the startup sequence before starting the loop.
  6. Calls Agent.agent_on_startup().

After startup succeeds, this method waits for the agent to be shutdown, such as due to a failure in a control loop or receiving a shutdown message.

Agent shutdown involves:

  1. Calls Agent.agent_on_shutdown().
  2. Cancels running control loop tasks.
  3. Cancels the mailbox message listener task so no new requests are received.
  4. Waits for any currently executing actions to complete.
  5. Terminates the agent's mailbox in the exchange if configured.
  6. Closes the exchange client.

Raises:

  • RuntimeError

    If the agent has already been shutdown.

  • Exception

    Any exceptions raised during startup, shutdown, or inside of control loops.

Source code in academy/runtime.py
async def run_until_complete(self) -> None:
    """Run the agent until shutdown.

    Agent startup involves:

    1. Creates a new exchange client for the agent.
    1. Sets the runtime context on the agent.
    1. Binds all handles of the agent to this agent's exchange client.
    1. Schedules a [`Task`][asyncio.Task] to listen for messages in the
       agent's mailbox in the exchange. Agent requests will not start
       processing until the end of the startup sequence.
    1. Schedules a [`Task`][asyncio.Task] for all control loops defined on
       the agent. Each task will block until the end of the startup
       sequence before starting the loop.
    1. Calls
       [`Agent.agent_on_startup()`][academy.agent.Agent.agent_on_startup].

    After startup succeeds, this method waits for the agent to be shutdown,
    such as due to a failure in a control loop or receiving a shutdown
    message.

    Agent shutdown involves:

    1. Calls
       [`Agent.agent_on_shutdown()`][academy.agent.Agent.agent_on_shutdown].
    1. Cancels running control loop tasks.
    1. Cancels the mailbox message listener task so no new requests are
       received.
    1. Waits for any currently executing actions to complete.
    1. Terminates the agent's mailbox in the exchange if configured.
    1. Closes the exchange client.

    Raises:
        RuntimeError: If the agent has already been shutdown.
        Exception: Any exceptions raised during startup, shutdown, or
            inside of control loops.
    """
    async with self:
        await self.wait_shutdown()

signal_shutdown

signal_shutdown(
    *, expected: bool = True, terminate: bool | None = None
) -> None

Signal that the agent should exit.

If the agent has not started, this will cause the agent to immediately shutdown when next started. If the agent is shutdown, this has no effect.

Parameters:

  • expected (bool, default: True ) –

    If the reason for the shutdown was due to normal expected reasons or due to unexpected errors.

  • terminate (bool | None, default: None ) –

    Optionally override the mailbox termination settings in the run config.

Source code in academy/runtime.py
def signal_shutdown(
    self,
    *,
    expected: bool = True,
    terminate: bool | None = None,
) -> None:
    """Signal that the agent should exit.

    If the agent has not started, this will cause the agent to immediately
    shutdown when next started. If the agent is shutdown, this has no
    effect.

    Args:
        expected: If the reason for the shutdown was due to normal
            expected reasons or due to unexpected errors.
        terminate: Optionally override the mailbox termination settings
            in the run config.
    """
    self._shutdown_options = _ShutdownState(
        expected_shutdown=expected,
        terminate_override=terminate,
    )
    self._shutdown_event.set()

wait_shutdown async

wait_shutdown(timeout: float | None = None) -> None

Wait for agent shutdown to be signalled.

Parameters:

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for a shutdown event.

Raises:

  • TimeoutError

    If timeout was exceeded while waiting for agents.

Source code in academy/runtime.py
async def wait_shutdown(self, timeout: float | None = None) -> None:
    """Wait for agent shutdown to be signalled.

    Args:
        timeout: Optional timeout in seconds to wait for a shutdown event.

    Raises:
        TimeoutError: If `timeout` was exceeded while waiting for agents.
    """
    try:
        await asyncio.wait_for(
            self._shutdown_event.wait(),
            timeout=timeout,
        )
    except asyncio.TimeoutError:
        # In Python 3.10 and older, asyncio.TimeoutError and TimeoutError
        # are different error types.
        raise TimeoutError(
            f'Agent shutdown was not signalled within {timeout} seconds.',
        ) from None