Skip to content

academy.runtime

RuntimeConfig dataclass

RuntimeConfig(
    cancel_actions_on_shutdown: bool = True,
    max_action_concurrency: int | None = None,
    shutdown_on_loop_error: bool = True,
    terminate_on_error: bool = True,
    terminate_on_success: bool = True,
)

Agent runtime configuration.

Attributes:

  • cancel_actions_on_shutdown (bool) –

    Cancel running actions when the agent is shutdown, otherwise wait for the actions to finish.

  • max_action_concurrency (int | None) –

    Maximum size of the thread pool used to concurrently execute action requests.

  • shutdown_on_loop_error (bool) –

    Shutdown the agent if any loop raises an error.

  • terminate_on_error (bool) –

    Terminate the agent by closing its mailbox permanently if the agent shuts down due to an error.

  • terminate_on_success (bool) –

    Terminate the agent by closing its mailbox permanently if the agent shuts down without an error.

Runtime

Runtime(
    agent: AgentT,
    *,
    exchange_factory: ExchangeFactory[ExchangeTransportT],
    registration: AgentRegistrationT,
    config: RuntimeConfig | None = None
)

Bases: Generic[AgentT], NoPickleMixin

Agent runtime manager.

The runtime is used to execute an agent by managing stateful resources, startup/shutdown, lifecycle hooks, and concurrency.

Note

This can only be run once. Calling run() multiple times will raise a RuntimeError.

Note

If any @loop method raises an error, the agent will be signaled to shutdown if shutdown_on_loop_error is set in the config.

Parameters:

Source code in academy/runtime.py
def __init__(
    self,
    agent: AgentT,
    *,
    exchange_factory: ExchangeFactory[ExchangeTransportT],
    registration: AgentRegistrationT,
    config: RuntimeConfig | None = None,
) -> None:
    self.agent_id = registration.agent_id
    self.agent = agent
    self.factory = exchange_factory
    self.registration = registration
    self.config = config if config is not None else RuntimeConfig()

    self._actions = agent._agent_actions()
    self._loops = agent._agent_loops()

    self._started_event = asyncio.Event()
    self._shutdown_event = asyncio.Event()
    self._shutdown_options = _ShutdownState()
    self._agent_startup_called = False

    self._action_tasks: dict[ActionRequest, asyncio.Task[None]] = {}
    self._loop_tasks: dict[str, asyncio.Task[None]] = {}
    self._loop_exceptions: list[tuple[str, Exception]] = []

    self._exchange_client: (
        AgentExchangeClient[AgentT, ExchangeTransportT] | None
    ) = None
    self._exchange_listener_task: asyncio.Task[None] | None = None

action async

action(
    action: str,
    source_id: EntityId,
    *,
    args: Any,
    kwargs: Any
) -> Any

Invoke an action of the agent's agent.

Parameters:

  • action (str) –

    Name of action to invoke.

  • source_id (EntityId) –

    ID of the source that requested the action.

  • args (Any) –

    Tuple of positional arguments.

  • kwargs (Any) –

    Dictionary of keyword arguments.

Returns:

  • Any

    Result of the action.

Raises:

  • AttributeError

    If an action with this name is not implemented by the agent's agent.

Source code in academy/runtime.py
async def action(
    self,
    action: str,
    source_id: EntityId,
    *,
    args: Any,
    kwargs: Any,
) -> Any:
    """Invoke an action of the agent's agent.

    Args:
        action: Name of action to invoke.
        source_id: ID of the source that requested the action.
        args: Tuple of positional arguments.
        kwargs: Dictionary of keyword arguments.

    Returns:
        Result of the action.

    Raises:
        AttributeError: If an action with this name is not implemented by
            the agent's agent.
    """
    logger.debug('Invoking "%s" action on %s', action, self.agent_id)
    if action not in self._actions:
        raise AttributeError(
            f'{self.agent} does not have an action named "{action}".',
        )
    action_method = self._actions[action]
    if action_method._action_method_context:
        assert self._exchange_client is not None
        context = ActionContext(source_id, self._exchange_client)
        return await action_method(*args, context=context, **kwargs)
    else:
        return await action_method(*args, **kwargs)

run async

run() -> None

Run the agent.

Agent startup involves:

  1. Creates a new exchange client for the agent.
  2. Sets the runtime context on the agent.
  3. Binds all handles of the agent to this agent's exchange client.
  4. Starts a Task to listen for messages in the agent's mailbox in the exchange.
  5. Starts a Task for all control loops defined on the agent.
  6. Calls Agent.agent_on_startup().

After startup succeeds, this method waits for the agent to be shutdown, such as due to a failure in a control loop or receiving a shutdown message.

Agent shutdown involves:

  1. Calls Agent.agent_on_shutdown().
  2. Cancels running control loop tasks.
  3. Cancels the mailbox message listener task so no new requests are received.
  4. Waits for any currently executing actions to complete.
  5. Terminates the agent's mailbox in the exchange if configured.
  6. Closes the exchange client.

Raises:

  • RuntimeError

    If the agent has already been shutdown.

  • Exception

    Any exceptions raised during startup, shutdown, or inside of control loops.

Source code in academy/runtime.py
async def run(self) -> None:
    """Run the agent.

    Agent startup involves:

    1. Creates a new exchange client for the agent.
    1. Sets the runtime context on the agent.
    1. Binds all handles of the agent to this agent's exchange client.
    1. Starts a [`Task`][asyncio.Task] to listen for messages in the
       agent's mailbox in the exchange.
    1. Starts a [`Task`][asyncio.Task] for all control loops defined on
       the agent.
    1. Calls
       [`Agent.agent_on_startup()`][academy.agent.Agent.agent_on_startup].

    After startup succeeds, this method waits for the agent to be shutdown,
    such as due to a failure in a control loop or receiving a shutdown
    message.

    Agent shutdown involves:

    1. Calls
       [`Agent.agent_on_shutdown()`][academy.agent.Agent.agent_on_shutdown].
    1. Cancels running control loop tasks.
    1. Cancels the mailbox message listener task so no new requests are
       received.
    1. Waits for any currently executing actions to complete.
    1. Terminates the agent's mailbox in the exchange if configured.
    1. Closes the exchange client.

    Raises:
        RuntimeError: If the agent has already been shutdown.
        Exception: Any exceptions raised during startup, shutdown, or
            inside of control loops.
    """
    try:
        await self._start()
    except:
        logger.exception('Agent startup failed (%r)', self)
        self.signal_shutdown(expected=False)
        await self._shutdown()
        raise

    try:
        await self._shutdown_event.wait()
    finally:
        await self._shutdown()

        # Raise loop exceptions so the caller of run() sees the errors,
        # even if the loop errors didn't cause the shutdown.
        raise_exceptions(
            (e for _, e in self._loop_exceptions),
            message='Caught failures in agent loops while shutting down.',
        )

signal_shutdown

signal_shutdown(
    *, expected: bool = True, terminate: bool | None = None
) -> None

Signal that the agent should exit.

If the agent has not started, this will cause the agent to immediately shutdown when next started. If the agent is shutdown, this has no effect.

Parameters:

  • expected (bool, default: True ) –

    If the reason for the shutdown was due to normal expected reasons or due to unexpected errors.

  • terminate (bool | None, default: None ) –

    Optionally override the mailbox termination settings in the run config.

Source code in academy/runtime.py
def signal_shutdown(
    self,
    *,
    expected: bool = True,
    terminate: bool | None = None,
) -> None:
    """Signal that the agent should exit.

    If the agent has not started, this will cause the agent to immediately
    shutdown when next started. If the agent is shutdown, this has no
    effect.

    Args:
        expected: If the reason for the shutdown was due to normal
            expected reasons or due to unexpected errors.
        terminate: Optionally override the mailbox termination settings
            in the run config.
    """
    self._shutdown_options = _ShutdownState(
        expected_shutdown=expected,
        terminate_override=terminate,
    )
    self._shutdown_event.set()