Skip to content

academy.launcher

Launcher

Launcher(
    executor: Executor,
    *,
    close_exchange: bool = True,
    max_restarts: int = 0
)

Launcher that wraps a concurrent.futures.Executor.

Parameters:

  • executor (Executor) –

    Executor used for launching agents. Note that this class takes ownership of the executor.

  • close_exchange (bool, default: True ) –

    Passed along to the Agent constructor. This should typically be True, the default, when the executor runs agents in separate processes, but should be False for the ThreadPoolExecutor to avoid closing shared exchange objects.

  • max_restarts (int, default: 0 ) –

    Maximum times to restart an agent if it exits with an error.

Source code in academy/launcher.py
def __init__(
    self,
    executor: Executor,
    *,
    close_exchange: bool = True,
    max_restarts: int = 0,
) -> None:
    self._executor = executor
    self._close_exchange = close_exchange
    self._max_restarts = max_restarts
    self._acbs: dict[AgentId[Any], _ACB[Any]] = {}
    self._future_to_acb: dict[Future[None], _ACB[Any]] = {}

close

close() -> None

Close the launcher.

Warning

This will not return until all agents have exited. It is the caller's responsibility to shutdown agents prior to closing the launcher.

Source code in academy/launcher.py
def close(self) -> None:
    """Close the launcher.

    Warning:
        This will not return until all agents have exited. It is the
        caller's responsibility to shutdown agents prior to closing
        the launcher.
    """
    logger.debug('Waiting for agents to shutdown...')
    for acb in self._acbs.values():
        if acb.done.is_set() and acb.future is not None:
            # Raise possible errors from agents so user is sure
            # to see them.
            acb.future.result()
    self._executor.shutdown(wait=True, cancel_futures=True)
    logger.debug('Closed launcher (%s)', self)

launch

launch(
    behavior: BehaviorT,
    exchange: Exchange,
    *,
    agent_id: AgentId[BehaviorT] | None = None,
    name: str | None = None
) -> RemoteHandle[BehaviorT]

Launch a new agent with a specified behavior.

Parameters:

  • behavior (BehaviorT) –

    Behavior the agent should implement.

  • exchange (Exchange) –

    Exchange the agent will use for messaging.

  • agent_id (AgentId[BehaviorT] | None, default: None ) –

    Specify ID of the launched agent. If None, a new agent ID will be created within the exchange.

  • name (str | None, default: None ) –

    Readable name of the agent. Ignored if agent_id is provided.

Returns:

  • RemoteHandle[BehaviorT]

    Handle (unbound) used to interact with the agent.

Source code in academy/launcher.py
def launch(
    self,
    behavior: BehaviorT,
    exchange: Exchange,
    *,
    agent_id: AgentId[BehaviorT] | None = None,
    name: str | None = None,
) -> RemoteHandle[BehaviorT]:
    """Launch a new agent with a specified behavior.

    Args:
        behavior: Behavior the agent should implement.
        exchange: Exchange the agent will use for messaging.
        agent_id: Specify ID of the launched agent. If `None`, a new
            agent ID will be created within the exchange.
        name: Readable name of the agent. Ignored if `agent_id` is
            provided.

    Returns:
        Handle (unbound) used to interact with the agent.
    """
    agent_id = (
        exchange.register_agent(type(behavior), name=name)
        if agent_id is None
        else agent_id
    )

    acb = _ACB(agent_id, behavior, exchange, done=threading.Event())
    self._acbs[agent_id] = acb
    self._launch(agent_id)

    return exchange.get_handle(agent_id)

running

running() -> set[AgentId[Any]]

Get a set of IDs for all running agents.

Returns:

  • set[AgentId[Any]]

    Set of agent IDs corresponding to all agents launched by this launcher that have not completed yet.

Source code in academy/launcher.py
def running(self) -> set[AgentId[Any]]:
    """Get a set of IDs for all running agents.

    Returns:
        Set of agent IDs corresponding to all agents launched by this \
        launcher that have not completed yet.
    """
    running: set[AgentId[Any]] = set()
    for acb in self._acbs.values():
        if not acb.done.is_set():
            running.add(acb.agent_id)
    return running

wait

wait(
    agent_id: AgentId[Any],
    *,
    ignore_error: bool = False,
    timeout: float | None = None
) -> None

Wait for a launched agent to exit.

Note

Calling wait() is only valid after launch() has succeeded.

Parameters:

  • agent_id (AgentId[Any]) –

    ID of launched agent.

  • ignore_error (bool, default: False ) –

    Ignore any errors raised by the agent.

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for agent.

Raises:

  • BadEntityIdError

    If an agent with agent_id was not launched by this launcher.

  • TimeoutError

    If timeout was exceeded while waiting for agent.

  • Exception

    Any exception raised by the agent if ignore_error=False.

Source code in academy/launcher.py
def wait(
    self,
    agent_id: AgentId[Any],
    *,
    ignore_error: bool = False,
    timeout: float | None = None,
) -> None:
    """Wait for a launched agent to exit.

    Note:
        Calling `wait()` is only valid after `launch()` has succeeded.

    Args:
        agent_id: ID of launched agent.
        ignore_error: Ignore any errors raised by the agent.
        timeout: Optional timeout in seconds to wait for agent.

    Raises:
        BadEntityIdError: If an agent with `agent_id` was not
            launched by this launcher.
        TimeoutError: If `timeout` was exceeded while waiting for agent.
        Exception: Any exception raised by the agent if
            `ignore_error=False`.
    """
    try:
        acb = self._acbs[agent_id]
    except KeyError:
        raise BadEntityIdError(agent_id) from None

    if not acb.done.wait(timeout):
        raise TimeoutError(
            f'Agent did not complete within {timeout}s timeout '
            f'({acb.agent_id})',
        )

    # The only time _ACB.future is None is between constructing the _ACB
    # in launch() and creating the future in _launch().
    assert acb.future is not None
    # _ACB.done event should only be set in callback of future so
    # the future must be done.
    assert acb.future.done()

    if not ignore_error:
        exc = acb.future.exception()
        if exc is not None:
            raise exc

ThreadLauncher

ThreadLauncher(
    max_workers: int | None = None, *, max_restarts: int = 0
)

Bases: Launcher

Launcher that wraps a default concurrent.futures.ThreadPoolExecutor.

Parameters:

  • max_workers (int | None, default: None ) –

    The maximum number of threads (i.e., agents) in the pool.

  • max_restarts (int, default: 0 ) –

    Maximum times to restart an agent if it exits with an error.

Source code in academy/launcher.py
def __init__(
    self,
    max_workers: int | None = None,
    *,
    max_restarts: int = 0,
) -> None:
    executor = ThreadPoolExecutor(
        max_workers=max_workers,
        thread_name_prefix='launcher',
    )
    super().__init__(
        executor,
        close_exchange=False,
        max_restarts=max_restarts,
    )

close

close() -> None

Close the launcher.

Warning

This will not return until all agents have exited. It is the caller's responsibility to shutdown agents prior to closing the launcher.

Source code in academy/launcher.py
def close(self) -> None:
    """Close the launcher.

    Warning:
        This will not return until all agents have exited. It is the
        caller's responsibility to shutdown agents prior to closing
        the launcher.
    """
    logger.debug('Waiting for agents to shutdown...')
    for acb in self._acbs.values():
        if acb.done.is_set() and acb.future is not None:
            # Raise possible errors from agents so user is sure
            # to see them.
            acb.future.result()
    self._executor.shutdown(wait=True, cancel_futures=True)
    logger.debug('Closed launcher (%s)', self)

launch

launch(
    behavior: BehaviorT,
    exchange: Exchange,
    *,
    agent_id: AgentId[BehaviorT] | None = None,
    name: str | None = None
) -> RemoteHandle[BehaviorT]

Launch a new agent with a specified behavior.

Parameters:

  • behavior (BehaviorT) –

    Behavior the agent should implement.

  • exchange (Exchange) –

    Exchange the agent will use for messaging.

  • agent_id (AgentId[BehaviorT] | None, default: None ) –

    Specify ID of the launched agent. If None, a new agent ID will be created within the exchange.

  • name (str | None, default: None ) –

    Readable name of the agent. Ignored if agent_id is provided.

Returns:

  • RemoteHandle[BehaviorT]

    Handle (unbound) used to interact with the agent.

Source code in academy/launcher.py
def launch(
    self,
    behavior: BehaviorT,
    exchange: Exchange,
    *,
    agent_id: AgentId[BehaviorT] | None = None,
    name: str | None = None,
) -> RemoteHandle[BehaviorT]:
    """Launch a new agent with a specified behavior.

    Args:
        behavior: Behavior the agent should implement.
        exchange: Exchange the agent will use for messaging.
        agent_id: Specify ID of the launched agent. If `None`, a new
            agent ID will be created within the exchange.
        name: Readable name of the agent. Ignored if `agent_id` is
            provided.

    Returns:
        Handle (unbound) used to interact with the agent.
    """
    agent_id = (
        exchange.register_agent(type(behavior), name=name)
        if agent_id is None
        else agent_id
    )

    acb = _ACB(agent_id, behavior, exchange, done=threading.Event())
    self._acbs[agent_id] = acb
    self._launch(agent_id)

    return exchange.get_handle(agent_id)

running

running() -> set[AgentId[Any]]

Get a set of IDs for all running agents.

Returns:

  • set[AgentId[Any]]

    Set of agent IDs corresponding to all agents launched by this launcher that have not completed yet.

Source code in academy/launcher.py
def running(self) -> set[AgentId[Any]]:
    """Get a set of IDs for all running agents.

    Returns:
        Set of agent IDs corresponding to all agents launched by this \
        launcher that have not completed yet.
    """
    running: set[AgentId[Any]] = set()
    for acb in self._acbs.values():
        if not acb.done.is_set():
            running.add(acb.agent_id)
    return running

wait

wait(
    agent_id: AgentId[Any],
    *,
    ignore_error: bool = False,
    timeout: float | None = None
) -> None

Wait for a launched agent to exit.

Note

Calling wait() is only valid after launch() has succeeded.

Parameters:

  • agent_id (AgentId[Any]) –

    ID of launched agent.

  • ignore_error (bool, default: False ) –

    Ignore any errors raised by the agent.

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for agent.

Raises:

  • BadEntityIdError

    If an agent with agent_id was not launched by this launcher.

  • TimeoutError

    If timeout was exceeded while waiting for agent.

  • Exception

    Any exception raised by the agent if ignore_error=False.

Source code in academy/launcher.py
def wait(
    self,
    agent_id: AgentId[Any],
    *,
    ignore_error: bool = False,
    timeout: float | None = None,
) -> None:
    """Wait for a launched agent to exit.

    Note:
        Calling `wait()` is only valid after `launch()` has succeeded.

    Args:
        agent_id: ID of launched agent.
        ignore_error: Ignore any errors raised by the agent.
        timeout: Optional timeout in seconds to wait for agent.

    Raises:
        BadEntityIdError: If an agent with `agent_id` was not
            launched by this launcher.
        TimeoutError: If `timeout` was exceeded while waiting for agent.
        Exception: Any exception raised by the agent if
            `ignore_error=False`.
    """
    try:
        acb = self._acbs[agent_id]
    except KeyError:
        raise BadEntityIdError(agent_id) from None

    if not acb.done.wait(timeout):
        raise TimeoutError(
            f'Agent did not complete within {timeout}s timeout '
            f'({acb.agent_id})',
        )

    # The only time _ACB.future is None is between constructing the _ACB
    # in launch() and creating the future in _launch().
    assert acb.future is not None
    # _ACB.done event should only be set in callback of future so
    # the future must be done.
    assert acb.future.done()

    if not ignore_error:
        exc = acb.future.exception()
        if exc is not None:
            raise exc