Skip to content

academy.manager

Manager

Manager(
    exchange: Exchange,
    launcher: Launcher | MutableMapping[str, Launcher],
    *,
    default_launcher: str | None = None
)

Bases: NoPickleMixin

Launch and manage running agents.

The manager is provided as convenience to reduce common boilerplate code for spawning agents and managing handles. Each manager registers itself as a client in the exchange (i.e., each manager has its own mailbox). Handles created by the manager are bound to this mailbox.

Tip

This class can be used as a context manager. Upon exiting the context, running agents will be shutdown, any agent handles created by the manager will be closed, and the exchange and launcher will be closed.

Note

The manager takes ownership of the exchange and launcher interfaces. This means the manager will be responsible for closing them once the manager is closed.

Parameters:

  • exchange (Exchange) –

    Exchange that agents and clients will use for communication.

  • launcher (Launcher | MutableMapping[str, Launcher]) –

    A mapping of names to launchers used to execute agents remotely. If a single launcher is provided directly, it is set as the default with name 'default', overriding any value of default_launcher.

  • default_launcher (str | None, default: None ) –

    Specify the name of the default launcher to use when not specified in launch().

Raises:

  • ValueError

    If default_launcher is specified but does not exist in launchers.

Source code in academy/manager.py
def __init__(
    self,
    exchange: Exchange,
    launcher: Launcher | MutableMapping[str, Launcher],
    *,
    default_launcher: str | None = None,
) -> None:
    if isinstance(launcher, Launcher):
        launcher = {'default': launcher}
        default_launcher = 'default'

    if default_launcher is not None and default_launcher not in launcher:
        raise ValueError(
            f'No launcher named "{default_launcher}" was provided to '
            'use as the default.',
        )

    self._exchange = exchange
    self._launchers = launcher
    self._default_launcher = default_launcher

    self._mailbox_id = exchange.register_client()
    self._multiplexer = MailboxMultiplexer(
        self.mailbox_id,
        self._exchange,
        self._handle_request,
    )
    self._handles: dict[AgentId[Any], BoundRemoteHandle[Any]] = {}
    self._aid_to_launcher: dict[AgentId[Any], str] = {}
    self._listener_thread = threading.Thread(
        target=self._multiplexer.listen,
        name=f'multiplexer-{self.mailbox_id.uid}-listener',
    )
    self._listener_thread.start()
    logger.info(
        'Initialized manager (%s; %s)',
        self._mailbox_id,
        self._exchange,
    )

exchange property

exchange: Exchange

Exchange interface.

mailbox_id property

mailbox_id: ClientId

EntityId of the mailbox used by this manager.

close

close() -> None

Close the manager and cleanup resources.

  1. Call shutdown on all running agents.
  2. Close all handles created by the manager.
  3. Close the mailbox associated with the manager.
  4. Close the exchange.
  5. Close all launchers.
Source code in academy/manager.py
def close(self) -> None:
    """Close the manager and cleanup resources.

    1. Call shutdown on all running agents.
    1. Close all handles created by the manager.
    1. Close the mailbox associated with the manager.
    1. Close the exchange.
    1. Close all launchers.
    """
    for launcher in self._launchers.values():
        for agent_id in launcher.running():
            handle = self._handles[agent_id]
            with contextlib.suppress(MailboxClosedError):
                handle.shutdown()
    logger.debug('Instructed managed agents to shutdown')
    self._multiplexer.close_bound_handles()
    self._multiplexer.terminate()
    self._listener_thread.join()
    self.exchange.close()
    for launcher in self._launchers.values():
        launcher.close()
    logger.info('Closed manager (%s)', self.mailbox_id)

add_launcher

add_launcher(name: str, launcher: Launcher) -> Self

Add a launcher to the manager.

Note

It is not possible to remove a launcher as this could create complications if an agent was already launched using a given launcher.

Parameters:

  • name (str) –

    Name of the launcher.

  • launcher (Launcher) –

    Launcher instance.

Returns:

  • Self

    Self for chaining.

Raises:

  • ValueError

    If a launcher with name already exists.

Source code in academy/manager.py
def add_launcher(self, name: str, launcher: Launcher) -> Self:
    """Add a launcher to the manager.

    Note:
        It is not possible to remove a launcher as this could create
        complications if an agent was already launched using a given
        launcher.

    Args:
        name: Name of the launcher.
        launcher: Launcher instance.

    Returns:
        Self for chaining.

    Raises:
        ValueError: If a launcher with `name` already exists.
    """
    if name in self._launchers:
        raise ValueError(f'Launcher named "{name}" already exists.')
    self._launchers[name] = launcher
    return self

set_default_launcher

set_default_launcher(name: str | None) -> Self

Set the name of the default launcher.

Parameters:

  • name (str | None) –

    Name of the launcher to default to. If None, no default launcher is set and all calls to launch() must specify the launcher.

Returns:

  • Self

    Self for chaining.

Raises:

Source code in academy/manager.py
def set_default_launcher(self, name: str | None) -> Self:
    """Set the name of the default launcher.

    Args:
        name: Name of the launcher to default to. If `None`, no default
            launcher is set and all calls to `launch()` must specify
            the launcher.

    Returns:
        Self for chaining.

    Raises:
        ValueError: If no launcher with `name` exists.
    """
    if name not in self._launchers:
        raise ValueError(f'A launcher name "{name}" does not exist.')
    self._default_launcher = name
    return self

launch

launch(
    behavior: BehaviorT,
    *,
    agent_id: AgentId[BehaviorT] | None = None,
    launcher: str | None = None,
    name: str | None = None
) -> BoundRemoteHandle[BehaviorT]

Launch a new agent with a specified behavior.

Note

Compared to Launcher.launch(), this method will inject the exchange and return a client-bound handle.

Parameters:

  • behavior (BehaviorT) –

    Behavior the agent should implement.

  • agent_id (AgentId[BehaviorT] | None, default: None ) –

    Specify ID of the launched agent. If None, a new agent ID will be created within the exchange.

  • launcher (str | None, default: None ) –

    Name of the launcher instance to use. In None, uses the default launcher if specified, otherwise raises an error.

  • name (str | None, default: None ) –

    Readable name of the agent. Ignored if agent_id is provided.

Returns:

  • BoundRemoteHandle[BehaviorT]

    Handle (client bound) used to interact with the agent.

Raises:

  • ValueError

    If no default launcher is set and launcher is not specified.

Source code in academy/manager.py
def launch(
    self,
    behavior: BehaviorT,
    *,
    agent_id: AgentId[BehaviorT] | None = None,
    launcher: str | None = None,
    name: str | None = None,
) -> BoundRemoteHandle[BehaviorT]:
    """Launch a new agent with a specified behavior.

    Note:
        Compared to `Launcher.launch()`, this method will inject the
        exchange and return a client-bound handle.

    Args:
        behavior: Behavior the agent should implement.
        agent_id: Specify ID of the launched agent. If `None`, a new
            agent ID will be created within the exchange.
        launcher: Name of the launcher instance to use. In `None`, uses
            the default launcher if specified, otherwise raises an error.
        name: Readable name of the agent. Ignored if `agent_id` is
            provided.

    Returns:
        Handle (client bound) used to interact with the agent.

    Raises:
        ValueError: If no default launcher is set and `launcher` is not
            specified.
    """
    if self._default_launcher is None and launcher is None:
        raise ValueError(
            'Must specify the launcher when no default is set.',
        )
    launcher = launcher if launcher is not None else self._default_launcher
    assert launcher is not None
    launcher_instance = self._launchers[launcher]
    unbound = launcher_instance.launch(
        behavior,
        exchange=self.exchange,
        agent_id=agent_id,
        name=name,
    )
    self._aid_to_launcher[unbound.agent_id] = launcher
    logger.info('Launched agent (%s; %s)', unbound.agent_id, behavior)
    bound = self._multiplexer.bind(unbound)
    self._handles[bound.agent_id] = bound
    logger.debug('Bound agent handle to manager (%s)', bound)
    return bound

shutdown

shutdown(
    agent_id: AgentId[Any],
    *,
    blocking: bool = True,
    timeout: float | None = None
) -> None

Shutdown a launched agent.

Parameters:

  • agent_id (AgentId[Any]) –

    ID of launched agent.

  • blocking (bool, default: True ) –

    Wait for the agent to exit before returning.

  • timeout (float | None, default: None ) –

    Optional timeout is seconds when blocking=True.

Raises:

  • BadEntityIdError

    If an agent with agent_id was not launched by this launcher.

  • TimeoutError

    If timeout was exceeded while blocking for agent.

Source code in academy/manager.py
def shutdown(
    self,
    agent_id: AgentId[Any],
    *,
    blocking: bool = True,
    timeout: float | None = None,
) -> None:
    """Shutdown a launched agent.

    Args:
        agent_id: ID of launched agent.
        blocking: Wait for the agent to exit before returning.
        timeout: Optional timeout is seconds when `blocking=True`.

    Raises:
        BadEntityIdError: If an agent with `agent_id` was not
            launched by this launcher.
        TimeoutError: If `timeout` was exceeded while blocking for agent.
    """
    try:
        handle = self._handles[agent_id]
    except KeyError:
        raise BadEntityIdError(agent_id) from None

    with contextlib.suppress(MailboxClosedError):
        handle.shutdown()

    if blocking:
        self.wait(agent_id, timeout=timeout)

wait

wait(
    agent: AgentId[Any] | RemoteHandle[Any],
    *,
    timeout: float | None = None
) -> None

Wait for a launched agent to exit.

Parameters:

  • agent (AgentId[Any] | RemoteHandle[Any]) –

    ID or handle to the launched agent.

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for agent.

Raises:

  • BadEntityIdError

    If the agent was not found. This likely means the agent was not launched by this launcher.

  • TimeoutError

    If timeout was exceeded while waiting for agent.

Source code in academy/manager.py
def wait(
    self,
    agent: AgentId[Any] | RemoteHandle[Any],
    *,
    timeout: float | None = None,
) -> None:
    """Wait for a launched agent to exit.

    Args:
        agent: ID or handle to the launched agent.
        timeout: Optional timeout in seconds to wait for agent.

    Raises:
        BadEntityIdError: If the agent was not found. This likely means
            the agent was not launched by this launcher.
        TimeoutError: If `timeout` was exceeded while waiting for agent.
    """
    agent_id = agent.agent_id if isinstance(agent, RemoteHandle) else agent
    if agent_id not in self._aid_to_launcher:
        raise BadEntityIdError(agent_id)
    launcher_name = self._aid_to_launcher[agent_id]
    self._launchers[launcher_name].wait(agent_id, timeout=timeout)