Skip to content

academy.manager

Manager

Manager(
    exchange_client: UserExchangeClient[ExchangeTransportT],
    executors: Executor | MutableMapping[str, Executor],
    *,
    default_executor: str | None = None,
    max_retries: int = 0
)

Bases: Generic[ExchangeTransportT], NoPickleMixin

Launch and manage running agents.

A manager is used to launch agents using one or more Executors and interact with/manage those agents.

Tip

This class can be used as a context manager. Upon exiting the context, running agents will be shutdown, any agent handles created by the manager will be closed, and the executors will be shutdown.

Tip

When using ProcessPoolExecutors, use the initializer argument to configure logging in the worker processes that will execute agents.

Note

The manager takes ownership of the exchange client and executors, meaning the manager will clean up those resources when the manager is closed.

Parameters:

  • exchange_client (UserExchangeClient[ExchangeTransportT]) –

    Exchange client.

  • executors (Executor | MutableMapping[str, Executor]) –

    An executor instance or mapping of names to executors to use to run agents. If a single executor is provided, it is set as the default executor with name 'default', overriding any value of default_executor.

  • default_executor (str | None, default: None ) –

    Specify the name of the default executor to use when not specified in launch().

  • max_retries (int, default: 0 ) –

    Maximum number of times to retry running an agent if it exits with an error.

Raises:

  • ValueError

    If default_executor is specified but does not exist in executors.

Source code in academy/manager.py
def __init__(
    self,
    exchange_client: UserExchangeClient[ExchangeTransportT],
    executors: Executor | MutableMapping[str, Executor],
    *,
    default_executor: str | None = None,
    max_retries: int = 0,
) -> None:
    if isinstance(executors, Executor):
        executors = {'default': executors}
        default_executor = 'default'

    if default_executor is not None and default_executor not in executors:
        raise ValueError(
            f'No executor named "{default_executor}" was provided to '
            'use as the default.',
        )

    self._exchange_client = exchange_client
    self._exchange_factory = exchange_client.factory()
    self._user_id = self._exchange_client.client_id
    self._executors = executors
    self._default_executor = default_executor
    self._max_retries = max_retries

    self._handles: dict[AgentId[Any], RemoteHandle[Any]] = {}
    self._acbs: dict[AgentId[Any], _ACB[Any]] = {}

    logger.info('Initialized manager (%s)', self.user_id)

exchange_client property

User client for the exchange.

exchange_factory property

Client factory for the exchange.

user_id property

user_id: UserId

Exchange client user ID of this manager.

from_exchange_factory async classmethod

from_exchange_factory(
    factory: ExchangeFactory[ExchangeTransportT],
    executors: Executor | MutableMapping[str, Executor],
    *,
    default_executor: str | None = None,
    max_retries: int = 0
) -> Self

Instantiate a new exchange client and manager from a factory.

Source code in academy/manager.py
@classmethod
async def from_exchange_factory(
    cls,
    factory: ExchangeFactory[ExchangeTransportT],
    executors: Executor | MutableMapping[str, Executor],
    *,
    default_executor: str | None = None,
    max_retries: int = 0,
) -> Self:
    """Instantiate a new exchange client and manager from a factory."""
    client = await factory.create_user_client()
    return cls(
        client,
        executors,
        default_executor=default_executor,
        max_retries=max_retries,
    )

close async

close() -> None

Shutdown the manager and cleanup resources.

  1. Request all running agents to shut down.
  2. Wait for all running agents to shut down.
  3. Close the exchange client.
  4. Shutdown the executors.
  5. Raise an exceptions returned by agents.

Raises:

  • Exception

    Any exceptions raised by agents.

Source code in academy/manager.py
async def close(self) -> None:
    """Shutdown the manager and cleanup resources.

    1. Request all running agents to shut down.
    1. Wait for all running agents to shut down.
    1. Close the exchange client.
    1. Shutdown the executors.
    1. Raise an exceptions returned by agents.

    Raises:
        Exception: Any exceptions raised by agents.
    """
    for acb in self._acbs.values():
        if not acb.task.done():
            handle = self.get_handle(acb.agent_id)
            with contextlib.suppress(AgentTerminatedError):
                await handle.shutdown()
    logger.debug('Requested shutdown from all agents')

    for acb in self._acbs.values():
        await acb.task
    logger.debug('All agents have completed')

    await self.exchange_client.close()
    for executor in self._executors.values():
        executor.shutdown(wait=True, cancel_futures=True)

    exceptions = (acb.task.exception() for acb in self._acbs.values())
    exceptions_only = tuple(e for e in exceptions if e is not None)
    raise_exceptions(
        exceptions_only,
        message='Caught failures in agent while shutting down.',
    )

    logger.info('Closed manager (%s)', self.user_id)

add_executor

add_executor(name: str, executor: Executor) -> Self

Add an executor to the manager.

Note

It is not possible to remove an executor as this could create complications if an agent is already running in that executor.

Parameters:

  • name (str) –

    Name of the executor used when launching agents.

  • executor (Executor) –

    Executor instance.

Returns:

  • Self

    Self for chaining.

Raises:

  • ValueError

    If an executor with name already exists.

Source code in academy/manager.py
def add_executor(self, name: str, executor: Executor) -> Self:
    """Add an executor to the manager.

    Note:
        It is not possible to remove an executor as this could create
        complications if an agent is already running in that executor.

    Args:
        name: Name of the executor used when launching agents.
        executor: Executor instance.

    Returns:
        Self for chaining.

    Raises:
        ValueError: If an executor with `name` already exists.
    """
    if name in self._executors:
        raise ValueError(f'Executor named "{name}" already exists.')
    self._executors[name] = executor
    return self

set_default_executor

set_default_executor(name: str | None) -> Self

Set the default executor by name.

Parameters:

  • name (str | None) –

    Name of the executor to use as default. If None, no default executor is set and all calls to launch() must specify the executor.

Returns:

  • Self

    Self for chaining.

Raises:

Source code in academy/manager.py
def set_default_executor(self, name: str | None) -> Self:
    """Set the default executor by name.

    Args:
        name: Name of the executor to use as default. If `None`, no
            default executor is set and all calls to `launch()` must
            specify the executor.

    Returns:
        Self for chaining.

    Raises:
        ValueError: If no executor with `name` exists.
    """
    if name not in self._executors:
        raise ValueError(f'An executor named "{name}" does not exist.')
    self._default_executor = name
    return self

launch async

launch(
    agent: AgentT | type[AgentT],
    *,
    args: tuple[Any, ...] | None = None,
    kwargs: dict[str, Any] | None = None,
    config: RuntimeConfig | None = None,
    executor: str | None = None,
    name: str | None = None,
    registration: AgentRegistration[AgentT] | None = None
) -> RemoteHandle[AgentT]

Launch a new agent with a specified agent.

Parameters:

  • agent (AgentT | type[AgentT]) –

    Agent instance the agent will implement or the agent type that will be initialized on the worker using args and kwargs.

  • args (tuple[Any, ...] | None, default: None ) –

    Positional arguments used to initialize the agent. Ignored if agent is already an instance.

  • kwargs (dict[str, Any] | None, default: None ) –

    Keyword arguments used to initialize the agent. Ignored if agent is already an instance.

  • config (RuntimeConfig | None, default: None ) –

    Agent run configuration.

  • executor (str | None, default: None ) –

    Name of the executor instance to use. If None, uses the default executor, if specified, otherwise raises an error.

  • name (str | None, default: None ) –

    Readable name of the agent used when registering a new agent.

  • registration (AgentRegistration[AgentT] | None, default: None ) –

    If None, a new agent will be registered with the exchange.

Returns:

Raises:

  • RuntimeError

    If registration is provided and an agent with that ID has already been executed.

  • ValueError

    If no default executor is set and executor is not specified.

Source code in academy/manager.py
async def launch(  # noqa: PLR0913
    self,
    agent: AgentT | type[AgentT],
    *,
    args: tuple[Any, ...] | None = None,
    kwargs: dict[str, Any] | None = None,
    config: RuntimeConfig | None = None,
    executor: str | None = None,
    name: str | None = None,
    registration: AgentRegistration[AgentT] | None = None,
) -> RemoteHandle[AgentT]:
    """Launch a new agent with a specified agent.

    Args:
        agent: Agent instance the agent will implement or the
            agent type that will be initialized on the worker using
            `args` and `kwargs`.
        args: Positional arguments used to initialize the agent.
            Ignored if `agent` is already an instance.
        kwargs: Keyword arguments used to initialize the agent.
            Ignored if `agent` is already an instance.
        config: Agent run configuration.
        executor: Name of the executor instance to use. If `None`, uses
            the default executor, if specified, otherwise raises an error.
        name: Readable name of the agent used when registering a new agent.
        registration: If `None`, a new agent will be registered with
            the exchange.

    Returns:
        Handle (client bound) used to interact with the agent.

    Raises:
        RuntimeError: If `registration` is provided and an agent with
            that ID has already been executed.
        ValueError: If no default executor is set and `executor` is not
            specified.
    """
    if self._default_executor is None and executor is None:
        raise ValueError(
            'Must specify the executor when no default is set.',
        )
    executor = executor if executor is not None else self._default_executor
    assert executor is not None
    executor_instance = self._executors[executor]

    if registration is None:
        agent_type = agent if isinstance(agent, type) else type(agent)
        registration = await self.register_agent(agent_type, name=name)
    elif registration.agent_id in self._acbs:
        raise RuntimeError(
            f'{registration.agent_id} has already been executed.',
        )

    agent_id = registration.agent_id

    spec = _RunSpec(
        agent=agent,
        config=RuntimeConfig() if config is None else config,
        exchange_factory=self.exchange_factory,
        registration=registration,
        agent_args=() if args is None else args,
        agent_kwargs={} if kwargs is None else kwargs,
    )

    task = asyncio.create_task(
        self._run_agent_in_executor(executor_instance, spec),
        name=f'manager-run-{agent_id}',
    )

    acb = _ACB(agent_id=agent_id, executor=executor, task=task)
    self._acbs[agent_id] = acb
    handle = self.get_handle(agent_id)
    logger.info('Launched agent (%s; %s)', agent_id, agent)
    self._warn_executor_overloaded(executor_instance, executor)
    return handle

get_handle

get_handle(
    agent: AgentId[AgentT] | AgentRegistration[AgentT],
) -> RemoteHandle[AgentT]

Create a new handle to an agent.

A handle acts like a reference to a remote agent, enabling a user to manage the agent or asynchronously invoke actions.

Parameters:

  • agent (AgentId[AgentT] | AgentRegistration[AgentT]) –

    Agent ID or registration indicating the agent to create a handle to. The agent must be registered with the same exchange that this manager is a client of.

Returns:

Source code in academy/manager.py
def get_handle(
    self,
    agent: AgentId[AgentT] | AgentRegistration[AgentT],
) -> RemoteHandle[AgentT]:
    """Create a new handle to an agent.

    A handle acts like a reference to a remote agent, enabling a user
    to manage the agent or asynchronously invoke actions.

    Args:
        agent: Agent ID or registration indicating the agent to create
            a handle to. The agent must be registered with the same
            exchange that this manager is a client of.

    Returns:
        Handle to the agent.
    """
    agent_id = agent if isinstance(agent, AgentId) else agent.agent_id
    handle = self._handles.get(agent_id, None)
    if handle is not None and not handle.closed():
        return handle
    handle = self.exchange_client.get_handle(agent_id)
    self._handles[agent_id] = handle
    return handle

register_agent async

register_agent(
    agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]

Register a new agent with the exchange.

Parameters:

  • agent (type[AgentT]) –

    Agent type of the agent.

  • name (str | None, default: None ) –

    Optional display name for the agent.

Returns:

Source code in academy/manager.py
async def register_agent(
    self,
    agent: type[AgentT],
    *,
    name: str | None = None,
) -> AgentRegistration[AgentT]:
    """Register a new agent with the exchange.

    Args:
        agent: Agent type of the agent.
        name: Optional display name for the agent.

    Returns:
        Agent registration info that can be passed to
        [`launch()`][academy.manager.Manager.launch].
    """
    return await self.exchange_client.register_agent(agent, name=name)

running

running() -> set[AgentId[Any]]

Get a set of IDs of all running agents.

Returns:

  • set[AgentId[Any]]

    Set of agent IDs corresponding to all agents launched by this manager that have not completed yet.

Source code in academy/manager.py
def running(self) -> set[AgentId[Any]]:
    """Get a set of IDs of all running agents.

    Returns:
        Set of agent IDs corresponding to all agents launched by this \
        manager that have not completed yet.
    """
    running: set[AgentId[Any]] = set()
    for acb in self._acbs.values():
        if not acb.task.done():
            running.add(acb.agent_id)
    return running

shutdown async

shutdown(
    agent: AgentId[Any] | RemoteHandle[Any],
    *,
    blocking: bool = True,
    raise_error: bool = True,
    terminate: bool | None = None,
    timeout: float | None = None
) -> None

Shutdown a launched agent.

Parameters:

  • agent (AgentId[Any] | RemoteHandle[Any]) –

    ID or handle to the launched agent.

  • blocking (bool, default: True ) –

    Wait for the agent to exit before returning.

  • raise_error (bool, default: True ) –

    Raise the error returned by the agent if blocking=True.

  • terminate (bool | None, default: None ) –

    Override the termination agent of the agent defined in the RuntimeConfig.

  • timeout (float | None, default: None ) –

    Optional timeout is seconds when blocking=True.

Raises:

  • BadEntityIdError

    If an agent with agent_id was not launched by this manager.

  • TimeoutError

    If timeout was exceeded while blocking for agent.

Source code in academy/manager.py
async def shutdown(
    self,
    agent: AgentId[Any] | RemoteHandle[Any],
    *,
    blocking: bool = True,
    raise_error: bool = True,
    terminate: bool | None = None,
    timeout: float | None = None,
) -> None:
    """Shutdown a launched agent.

    Args:
        agent: ID or handle to the launched agent.
        blocking: Wait for the agent to exit before returning.
        raise_error: Raise the error returned by the agent if
            `blocking=True`.
        terminate: Override the termination agent of the agent defined
            in the [`RuntimeConfig`][academy.runtime.RuntimeConfig].
        timeout: Optional timeout is seconds when `blocking=True`.

    Raises:
        BadEntityIdError: If an agent with `agent_id` was not
            launched by this manager.
        TimeoutError: If `timeout` was exceeded while blocking for agent.
    """
    agent_id = agent.agent_id if isinstance(agent, RemoteHandle) else agent

    if agent_id not in self._acbs:
        raise BadEntityIdError(agent_id) from None
    if self._acbs[agent_id].task.done():
        return

    handle = self.get_handle(agent_id)
    with contextlib.suppress(AgentTerminatedError):
        await handle.shutdown(terminate=terminate)

    if blocking:
        await self.wait(
            {agent_id},
            raise_error=raise_error,
            timeout=timeout,
        )

wait async

wait(
    agents: Iterable[AgentId[Any] | RemoteHandle[Any]],
    *,
    raise_error: bool = False,
    return_when: str = ALL_COMPLETED,
    timeout: float | None = None
) -> None

Wait for launched agents to complete.

Note

Calling wait() is only valid after launch() has succeeded.

Parameters:

  • agents (Iterable[AgentId[Any] | RemoteHandle[Any]]) –

    An iterable of agent IDs or handles to wait on.

  • raise_error (bool, default: False ) –

    Raise errors returned by completed agents.

  • return_when (str, default: ALL_COMPLETED ) –

    Indicate when this function should return. The same as asyncio.wait().

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for agents.

Raises:

  • BadEntityIdError

    If an agent was not launched by this manager.

  • TimeoutError

    If timeout was exceeded while waiting for agents.

  • Exception

    Any exception raised by an agent that completed due to a failure and raise_error=True is set.

Source code in academy/manager.py
async def wait(
    self,
    agents: Iterable[AgentId[Any] | RemoteHandle[Any]],
    *,
    raise_error: bool = False,
    return_when: str = asyncio.ALL_COMPLETED,
    timeout: float | None = None,
) -> None:
    """Wait for launched agents to complete.

    Note:
        Calling `wait()` is only valid after `launch()` has succeeded.

    Args:
        agents: An iterable of agent IDs or handles to wait on.
        raise_error: Raise errors returned by completed agents.
        return_when: Indicate when this function should return. The
            same as [`asyncio.wait()`][asyncio.wait].
        timeout: Optional timeout in seconds to wait for agents.

    Raises:
        BadEntityIdError: If an agent was not launched by this manager.
        TimeoutError: If `timeout` was exceeded while waiting for agents.
        Exception: Any exception raised by an agent that completed due
            to a failure and `raise_error=True` is set.
    """
    agent_ids = {
        agent if isinstance(agent, AgentId) else agent.agent_id
        for agent in agents
    }

    if len(agent_ids) == 0:
        return

    agent_tasks: list[asyncio.Task[None]] = []
    for agent_id in agent_ids:
        try:
            agent_tasks.append(self._acbs[agent_id].task)
        except KeyError:
            raise BadEntityIdError(agent_id) from None

    done, pending = await asyncio.wait(
        agent_tasks,
        return_when=return_when,
        timeout=timeout,
    )

    if len(done) == 0:
        raise TimeoutError(
            f'No agents completed within {timeout} seconds: '
            f'{len(pending)} pending agent(s).',
        )
    elif return_when == asyncio.ALL_COMPLETED and len(pending) > 0:
        raise TimeoutError(
            f'Not all agents completed within {timeout} seconds: '
            f'{len(pending)} pending agent(s).',
        )

    if raise_error:
        exceptions = (task.exception() for task in agent_tasks)
        exceptions_only = tuple(e for e in exceptions if e is not None)
        raise_exceptions(
            exceptions_only,
            message='Waited agents raised the following exceptions.',
        )