Skip to content

academy.handle

Handle

Bases: Protocol[AgentT]

Agent handle protocol.

A handle enables an agent or user to invoke actions on another agent.

agent_id property

agent_id: AgentId[AgentT]

ID of the agent this is a handle to.

client_id property

client_id: EntityId

ID of the client for this handle.

action async

action(
    action: str, /, *args: Any, **kwargs: Any
) -> Future[R]

Invoke an action on the agent.

Parameters:

  • action (str) –

    Action to invoke.

  • args (Any, default: () ) –

    Positional arguments for the action.

  • kwargs (Any, default: {} ) –

    Keywords arguments for the action.

Returns:

  • Future[R]

    Future to the result of the action.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • HandleClosedError

    If the handle was closed.

Source code in academy/handle.py
async def action(
    self,
    action: str,
    /,
    *args: Any,
    **kwargs: Any,
) -> asyncio.Future[R]:
    """Invoke an action on the agent.

    Args:
        action: Action to invoke.
        args: Positional arguments for the action.
        kwargs: Keywords arguments for the action.

    Returns:
        Future to the result of the action.

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        HandleClosedError: If the handle was closed.
    """
    ...

close async

close(
    wait_futures: bool = True,
    *,
    timeout: float | None = None
) -> None

Close this handle.

Parameters:

  • wait_futures (bool, default: True ) –

    Wait to return until all pending futures are done executing. If False, pending futures are cancelled.

  • timeout (float | None, default: None ) –

    Optional timeout used when wait=True.

Source code in academy/handle.py
async def close(
    self,
    wait_futures: bool = True,
    *,
    timeout: float | None = None,
) -> None:
    """Close this handle.

    Args:
        wait_futures: Wait to return until all pending futures are done
            executing. If `False`, pending futures are cancelled.
        timeout: Optional timeout used when `wait=True`.
    """
    ...

ping async

ping(*, timeout: float | None = None) -> float

Ping the agent.

Ping the agent and wait to get a response. Agents process messages in order so the round-trip time will include processing time of earlier messages in the queue.

Parameters:

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for the response.

Returns:

  • float

    Round-trip time in seconds.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • HandleClosedError

    If the handle was closed.

  • TimeoutError

    If the timeout is exceeded.

Source code in academy/handle.py
async def ping(self, *, timeout: float | None = None) -> float:
    """Ping the agent.

    Ping the agent and wait to get a response. Agents process messages
    in order so the round-trip time will include processing time of
    earlier messages in the queue.

    Args:
        timeout: Optional timeout in seconds to wait for the response.

    Returns:
        Round-trip time in seconds.

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        HandleClosedError: If the handle was closed.
        TimeoutError: If the timeout is exceeded.
    """
    ...

shutdown async

shutdown(*, terminate: bool | None = None) -> None

Instruct the agent to shutdown.

This is non-blocking and will only send the message.

Parameters:

  • terminate (bool | None, default: None ) –

    Override the termination agent of the agent defined in the RuntimeConfig.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • HandleClosedError

    If the handle was closed.

Source code in academy/handle.py
async def shutdown(self, *, terminate: bool | None = None) -> None:
    """Instruct the agent to shutdown.

    This is non-blocking and will only send the message.

    Args:
        terminate: Override the termination agent of the agent defined
            in the [`RuntimeConfig`][academy.runtime.RuntimeConfig].

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        HandleClosedError: If the handle was closed.
    """
    ...

HandleDict

HandleDict(
    values: (
        Mapping[K, Handle[AgentT]]
        | Iterable[tuple[K, Handle[AgentT]]]
    ) = (),
    /,
    **kwargs: dict[str, Handle[AgentT]],
)

Bases: dict[K, Handle[AgentT]]

Dictionary mapping keys to handles.

Tip

The HandleDict is required when storing a mapping of handles as attributes of a Agent so that those handles get bound to the correct agent when running.

Source code in academy/handle.py
def __init__(
    self,
    values: Mapping[K, Handle[AgentT]]
    | Iterable[tuple[K, Handle[AgentT]]] = (),
    /,
    **kwargs: dict[str, Handle[AgentT]],
) -> None:
    super().__init__(values, **kwargs)

HandleList

HandleList(iterable: Iterable[Handle[AgentT]] = ())

Bases: list[Handle[AgentT]]

List of handles.

Tip

The HandleList is required when storing a list of handles as attributes of a Agent so that those handles get bound to the correct agent when running.

Source code in academy/handle.py
def __init__(
    self,
    iterable: Iterable[Handle[AgentT]] = (),
    /,
) -> None:
    super().__init__(iterable)

ProxyHandle

ProxyHandle(agent: AgentT)

Bases: Generic[AgentT]

Proxy handle.

A proxy handle is thin wrapper around a Agent instance that is useful for testing agents that are initialized with a handle to another agent without needing to spawn agents. This wrapper invokes actions synchronously.

Source code in academy/handle.py
def __init__(self, agent: AgentT) -> None:
    self.agent = agent
    self.agent_id: AgentId[AgentT] = AgentId.new()
    self.client_id: EntityId = UserId.new()
    self._agent_closed = False
    self._handle_closed = False

action async

action(
    action: str, /, *args: Any, **kwargs: Any
) -> Future[R]

Invoke an action on the agent.

Parameters:

  • action (str) –

    Action to invoke.

  • args (Any, default: () ) –

    Positional arguments for the action.

  • kwargs (Any, default: {} ) –

    Keywords arguments for the action.

Returns:

  • Future[R]

    Future to the result of the action.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • HandleClosedError

    If the handle was closed.

Source code in academy/handle.py
async def action(
    self,
    action: str,
    /,
    *args: Any,
    **kwargs: Any,
) -> asyncio.Future[R]:
    """Invoke an action on the agent.

    Args:
        action: Action to invoke.
        args: Positional arguments for the action.
        kwargs: Keywords arguments for the action.

    Returns:
        Future to the result of the action.

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        HandleClosedError: If the handle was closed.
    """
    if self._agent_closed:
        raise AgentTerminatedError(self.agent_id)
    elif self._handle_closed:
        raise HandleClosedError(self.agent_id, self.client_id)

    future: asyncio.Future[R] = asyncio.get_running_loop().create_future()
    try:
        method = getattr(self.agent, action)
        result = await method(*args, **kwargs)
    except Exception as e:
        future.set_exception(e)
    else:
        future.set_result(result)
    return future

close async

close(
    wait_futures: bool = True,
    *,
    timeout: float | None = None
) -> None

Close this handle.

Note

This is a no-op for proxy handles.

Parameters:

  • wait_futures (bool, default: True ) –

    Wait to return until all pending futures are done executing. If False, pending futures are cancelled.

  • timeout (float | None, default: None ) –

    Optional timeout used when wait=True.

Source code in academy/handle.py
async def close(
    self,
    wait_futures: bool = True,
    *,
    timeout: float | None = None,
) -> None:
    """Close this handle.

    Note:
        This is a no-op for proxy handles.

    Args:
        wait_futures: Wait to return until all pending futures are done
            executing. If `False`, pending futures are cancelled.
        timeout: Optional timeout used when `wait=True`.
    """
    self._handle_closed = True

ping async

ping(*, timeout: float | None = None) -> float

Ping the agent.

Ping the agent and wait to get a response. Agents process messages in order so the round-trip time will include processing time of earlier messages in the queue.

Note

This is a no-op for proxy handles and returns 0 latency.

Parameters:

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for the response.

Returns:

  • float

    Round-trip time in seconds.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • HandleClosedError

    If the handle was closed.

  • TimeoutError

    If the timeout is exceeded.

Source code in academy/handle.py
async def ping(self, *, timeout: float | None = None) -> float:
    """Ping the agent.

    Ping the agent and wait to get a response. Agents process messages
    in order so the round-trip time will include processing time of
    earlier messages in the queue.

    Note:
        This is a no-op for proxy handles and returns 0 latency.

    Args:
        timeout: Optional timeout in seconds to wait for the response.

    Returns:
        Round-trip time in seconds.

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        HandleClosedError: If the handle was closed.
        TimeoutError: If the timeout is exceeded.
    """
    if self._agent_closed:
        raise AgentTerminatedError(self.agent_id)
    elif self._handle_closed:
        raise HandleClosedError(self.agent_id, self.client_id)
    return 0

shutdown async

shutdown(*, terminate: bool | None = None) -> None

Instruct the agent to shutdown.

This is non-blocking and will only send the message.

Parameters:

  • terminate (bool | None, default: None ) –

    Override the termination agent of the agent defined in the RuntimeConfig.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • HandleClosedError

    If the handle was closed.

Source code in academy/handle.py
async def shutdown(self, *, terminate: bool | None = None) -> None:
    """Instruct the agent to shutdown.

    This is non-blocking and will only send the message.

    Args:
        terminate: Override the termination agent of the agent defined
            in the [`RuntimeConfig`][academy.runtime.RuntimeConfig].

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        HandleClosedError: If the handle was closed.
    """
    if self._agent_closed:
        raise AgentTerminatedError(self.agent_id)
    elif self._handle_closed:
        raise HandleClosedError(self.agent_id, self.client_id)
    self._agent_closed = True if terminate is None else terminate

UnboundRemoteHandle

UnboundRemoteHandle(agent_id: AgentId[AgentT])

Bases: Generic[AgentT]

Handle to a remote agent that not bound to a mailbox.

Warning

An unbound handle must be bound before use. Otherwise all methods will raise an HandleNotBoundError when attempting to send a message to the remote agent.

Parameters:

Source code in academy/handle.py
def __init__(self, agent_id: AgentId[AgentT]) -> None:
    self.agent_id = agent_id

client_id property

client_id: EntityId

Raises RuntimeError when unbound.

bind_to_client

bind_to_client(
    client: ExchangeClient[Any],
) -> RemoteHandle[AgentT]

Bind the handle to an existing mailbox.

Parameters:

Returns:

Source code in academy/handle.py
def bind_to_client(
    self,
    client: ExchangeClient[Any],
) -> RemoteHandle[AgentT]:
    """Bind the handle to an existing mailbox.

    Args:
        client: Exchange client.

    Returns:
        Remote handle bound to the exchange client.
    """
    return client.get_handle(self.agent_id)

action async

action(
    action: str, /, *args: Any, **kwargs: Any
) -> Future[R]

Raises HandleNotBoundError.

Source code in academy/handle.py
async def action(
    self,
    action: str,
    /,
    *args: Any,
    **kwargs: Any,
) -> asyncio.Future[R]:
    """Raises [`HandleNotBoundError`][academy.exception.HandleNotBoundError]."""  # noqa: E501
    raise HandleNotBoundError(self.agent_id)

close async

close(
    wait_futures: bool = True,
    *,
    timeout: float | None = None
) -> None

Raises HandleNotBoundError.

Source code in academy/handle.py
async def close(
    self,
    wait_futures: bool = True,
    *,
    timeout: float | None = None,
) -> None:
    """Raises [`HandleNotBoundError`][academy.exception.HandleNotBoundError]."""  # noqa: E501
    raise HandleNotBoundError(self.agent_id)

ping async

ping(*, timeout: float | None = None) -> float

Raises HandleNotBoundError.

Source code in academy/handle.py
async def ping(self, *, timeout: float | None = None) -> float:
    """Raises [`HandleNotBoundError`][academy.exception.HandleNotBoundError]."""  # noqa: E501
    raise HandleNotBoundError(self.agent_id)

shutdown async

shutdown(*, terminate: bool | None = None) -> None

Raises HandleNotBoundError.

Source code in academy/handle.py
async def shutdown(self, *, terminate: bool | None = None) -> None:
    """Raises [`HandleNotBoundError`][academy.exception.HandleNotBoundError]."""  # noqa: E501
    raise HandleNotBoundError(self.agent_id)

RemoteHandle

RemoteHandle(
    exchange: ExchangeClient[Any], agent_id: AgentId[AgentT]
)

Bases: Generic[AgentT]

Handle to a remote agent bound to an exchange client.

Parameters:

  • exchange (ExchangeClient[Any]) –

    Exchange client used for agent communication.

  • agent_id (AgentId[AgentT]) –

    EntityId of the target agent of this handle.

Source code in academy/handle.py
def __init__(
    self,
    exchange: ExchangeClient[Any],
    agent_id: AgentId[AgentT],
) -> None:
    self.exchange = exchange
    self.agent_id = agent_id
    self.client_id = exchange.client_id

    if self.agent_id == self.client_id:
        raise ValueError(
            'Cannot create handle to self. The IDs of the exchange '
            f'client and the target agent are the same: {self.agent_id}.',
        )
    # Unique identifier for each handle object; used to disambiguate
    # messages when multiple handles are bound to the same mailbox.
    self.handle_id = uuid.uuid4()

    self._futures: dict[uuid.UUID, asyncio.Future[Any]] = {}
    self._closed = False

clone

Create an unbound copy of this handle.

Source code in academy/handle.py
def clone(self) -> UnboundRemoteHandle[AgentT]:
    """Create an unbound copy of this handle."""
    return UnboundRemoteHandle(self.agent_id)

close async

close(
    wait_futures: bool = True,
    *,
    timeout: float | None = None
) -> None

Close this handle.

Note

This does not close the exchange client.

Parameters:

  • wait_futures (bool, default: True ) –

    Wait to return until all pending futures are done executing. If False, pending futures are cancelled.

  • timeout (float | None, default: None ) –

    Optional timeout used when wait=True.

Source code in academy/handle.py
async def close(
    self,
    wait_futures: bool = True,
    *,
    timeout: float | None = None,
) -> None:
    """Close this handle.

    Note:
        This does not close the exchange client.

    Args:
        wait_futures: Wait to return until all pending futures are done
            executing. If `False`, pending futures are cancelled.
        timeout: Optional timeout used when `wait=True`.
    """
    self._closed = True

    if len(self._futures) == 0:
        return
    if wait_futures:
        logger.debug('Waiting on pending futures for %s', self)
        await asyncio.wait(
            list(self._futures.values()),
            timeout=timeout,
        )
    else:
        logger.debug('Cancelling pending futures for %s', self)
        for future in self._futures:
            self._futures[future].cancel()

closed

closed() -> bool

Check if the handle has been closed.

Source code in academy/handle.py
def closed(self) -> bool:
    """Check if the handle has been closed."""
    return self._closed

action async

action(
    action: str, /, *args: Any, **kwargs: Any
) -> Future[R]

Invoke an action on the agent.

Parameters:

  • action (str) –

    Action to invoke.

  • args (Any, default: () ) –

    Positional arguments for the action.

  • kwargs (Any, default: {} ) –

    Keywords arguments for the action.

Returns:

  • Future[R]

    Future to the result of the action.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • HandleClosedError

    If the handle was closed.

Source code in academy/handle.py
async def action(
    self,
    action: str,
    /,
    *args: Any,
    **kwargs: Any,
) -> asyncio.Future[R]:
    """Invoke an action on the agent.

    Args:
        action: Action to invoke.
        args: Positional arguments for the action.
        kwargs: Keywords arguments for the action.

    Returns:
        Future to the result of the action.

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        HandleClosedError: If the handle was closed.
    """
    if self._closed:
        raise HandleClosedError(self.agent_id, self.client_id)

    request = ActionRequest(
        src=self.client_id,
        dest=self.agent_id,
        label=self.handle_id,
        action=action,
        pargs=args,
        kargs=kwargs,
    )
    loop = asyncio.get_running_loop()
    future: asyncio.Future[R] = loop.create_future()
    self._futures[request.tag] = future
    await self.exchange.send(request)
    logger.debug(
        'Sent action request from %s to %s (action=%r)',
        self.client_id,
        self.agent_id,
        action,
    )
    return future

ping async

ping(*, timeout: float | None = None) -> float

Ping the agent.

Ping the agent and wait to get a response. Agents process messages in order so the round-trip time will include processing time of earlier messages in the queue.

Parameters:

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for the response.

Returns:

  • float

    Round-trip time in seconds.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • HandleClosedError

    If the handle was closed.

  • TimeoutError

    If the timeout is exceeded.

Source code in academy/handle.py
async def ping(self, *, timeout: float | None = None) -> float:
    """Ping the agent.

    Ping the agent and wait to get a response. Agents process messages
    in order so the round-trip time will include processing time of
    earlier messages in the queue.

    Args:
        timeout: Optional timeout in seconds to wait for the response.

    Returns:
        Round-trip time in seconds.

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        HandleClosedError: If the handle was closed.
        TimeoutError: If the timeout is exceeded.
    """
    if self._closed:
        raise HandleClosedError(self.agent_id, self.client_id)

    request = PingRequest(
        src=self.client_id,
        dest=self.agent_id,
        label=self.handle_id,
    )
    loop = asyncio.get_running_loop()
    future: asyncio.Future[None] = loop.create_future()
    self._futures[request.tag] = future
    start = time.perf_counter()
    await self.exchange.send(request)
    logger.debug('Sent ping from %s to %s', self.client_id, self.agent_id)

    done, pending = await asyncio.wait({future}, timeout=timeout)
    if future in pending:
        raise TimeoutError(
            f'Did not receive ping response within {timeout} seconds.',
        )
    elapsed = time.perf_counter() - start
    logger.debug(
        'Received ping from %s to %s in %.1f ms',
        self.client_id,
        self.agent_id,
        elapsed * 1000,
    )
    return elapsed

shutdown async

shutdown(*, terminate: bool | None = None) -> None

Instruct the agent to shutdown.

This is non-blocking and will only send the message.

Parameters:

  • terminate (bool | None, default: None ) –

    Override the termination agent of the agent defined in the RuntimeConfig.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • HandleClosedError

    If the handle was closed.

Source code in academy/handle.py
async def shutdown(self, *, terminate: bool | None = None) -> None:
    """Instruct the agent to shutdown.

    This is non-blocking and will only send the message.

    Args:
        terminate: Override the termination agent of the agent defined
            in the [`RuntimeConfig`][academy.runtime.RuntimeConfig].

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        HandleClosedError: If the handle was closed.
    """
    if self._closed:
        raise HandleClosedError(self.agent_id, self.client_id)

    request = ShutdownRequest(
        src=self.client_id,
        dest=self.agent_id,
        label=self.handle_id,
        terminate=terminate,
    )
    await self.exchange.send(request)
    logger.debug(
        'Sent shutdown request from %s to %s',
        self.client_id,
        self.agent_id,
    )