Skip to content

academy.handle

Handle

Handle(
    agent_id: AgentId[AgentT],
    *,
    exchange: ExchangeClient[Any] | None = None,
    ignore_context: bool = False
)

Bases: Generic[AgentT]

Handle to a remote agent.

Internally, handles use an ExchangeClient to send requests to and receive responses from the remote agent. By default the correct exchange client is inferred from the context using a context variable (specifically, the academy.handle.exchange_context variable). This allows the same handle to be used in different contexts, automatically using the correct client to send messages.

When a handle is used in contexts that have not configured the exchange client (such as outside of an agent runtime or Manager), a default exchange can be provided via the exchange argument. For advanced usage, the ignore_context flag will cause the handle to only use the exchange argument no matter what the current context is.

Note

The exchange argument will not be included when a handle is pickled. Thus, unpickled handles must be used in a context that configures an exchange client.

Parameters:

  • agent_id (AgentId[AgentT]) –

    ID of the remote agent.

  • exchange (ExchangeClient[Any] | None, default: None ) –

    A default exchange client to be used if an exchange client is not configured in the current context.

  • ignore_context (bool, default: False ) –

    Ignore the current context and force use of exchange for communication.

Raises:

  • ValueError

    If ignore_context=True but exchange is not provided.

Source code in academy/handle.py
def __init__(
    self,
    agent_id: AgentId[AgentT],
    *,
    exchange: ExchangeClient[Any] | None = None,
    ignore_context: bool = False,
) -> None:
    self.agent_id = agent_id
    self._exchange = exchange
    self._registered_exchanges: WeakSet[ExchangeClient[Any]] = WeakSet()
    self.ignore_context = ignore_context

    if ignore_context and not exchange:
        raise ValueError(
            'Cannot initialize handle with ignore_context=True '
            'and no explicit exchange.',
        )

    # Unique identifier for each handle object; used to disambiguate
    # messages when multiple handles are bound to the same mailbox.
    self.handle_id = uuid.uuid4()
    self._pending_response_futures: dict[
        uuid.UUID,
        asyncio.Future[Any],
    ] = {}
    self._shutdown_requests: set[uuid.UUID] = set()

    if self._exchange is not None:
        self._register_with_exchange(self._exchange)

exchange property

exchange: ExchangeClient[Any]

Exchange client used to send messages.

Returns:

Raises:

action async

action(action: str, /, *args: Any, **kwargs: Any) -> R

Invoke an action on the agent.

Parameters:

  • action (str) –

    Action to invoke.

  • args (Any, default: () ) –

    Positional arguments for the action.

  • kwargs (Any, default: {} ) –

    Keywords arguments for the action.

Returns:

  • R

    Result of the action.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • Exception

    Any exception raised by the action.

Source code in academy/handle.py
async def action(self, action: str, /, *args: Any, **kwargs: Any) -> R:
    """Invoke an action on the agent.

    Args:
        action: Action to invoke.
        args: Positional arguments for the action.
        kwargs: Keywords arguments for the action.

    Returns:
        Result of the action.

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        Exception: Any exception raised by the action.
    """
    exchange = self.exchange
    self._register_with_exchange(exchange)

    request = Message.create(
        src=exchange.client_id,
        dest=self.agent_id,
        label=self.handle_id,
        body=ActionRequest(action=action, pargs=args, kargs=kwargs),
    )
    loop = asyncio.get_running_loop()
    future: asyncio.Future[R] = loop.create_future()
    self._pending_response_futures[request.tag] = future

    await self.exchange.send(request)
    logger.debug(
        'Sent action request from %s to %s (action=%r)',
        self.client_id,
        self.agent_id,
        action,
    )
    await future
    return future.result()

ping async

ping(*, timeout: float | None = None) -> float

Ping the agent.

Ping the agent and wait to get a response.

Parameters:

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for the response.

Returns:

  • float

    Round-trip time in seconds.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • TimeoutError

    If the timeout is exceeded.

Source code in academy/handle.py
async def ping(self, *, timeout: float | None = None) -> float:
    """Ping the agent.

    Ping the agent and wait to get a response.

    Args:
        timeout: Optional timeout in seconds to wait for the response.

    Returns:
        Round-trip time in seconds.

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        TimeoutError: If the timeout is exceeded.
    """
    exchange = self.exchange
    self._register_with_exchange(exchange)

    request = Message.create(
        src=exchange.client_id,
        dest=self.agent_id,
        label=self.handle_id,
        body=PingRequest(),
    )
    loop = asyncio.get_running_loop()
    future: asyncio.Future[None] = loop.create_future()
    self._pending_response_futures[request.tag] = future
    start = time.perf_counter()
    await self.exchange.send(request)
    logger.debug('Sent ping from %s to %s', self.client_id, self.agent_id)

    done, pending = await asyncio.wait({future}, timeout=timeout)
    if future in pending:
        raise TimeoutError(
            f'Did not receive ping response within {timeout} seconds.',
        )
    elapsed = time.perf_counter() - start
    logger.debug(
        'Received ping from %s to %s in %.1f ms',
        exchange.client_id,
        self.agent_id,
        elapsed * 1000,
    )
    return elapsed

shutdown async

shutdown(*, terminate: bool | None = None) -> None

Instruct the agent to shutdown.

This is non-blocking and will only send the request. Any unexpected error responses sent by the exchange will be logged.

Parameters:

  • terminate (bool | None, default: None ) –

    Override the termination behavior of the agent defined in the RuntimeConfig.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

Source code in academy/handle.py
async def shutdown(self, *, terminate: bool | None = None) -> None:
    """Instruct the agent to shutdown.

    This is non-blocking and will only send the request. Any unexpected
    error responses sent by the exchange will be logged.

    Args:
        terminate: Override the termination behavior of the agent defined
            in the [`RuntimeConfig`][academy.runtime.RuntimeConfig].

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
    """
    exchange = self.exchange
    self._register_with_exchange(exchange)

    request = Message.create(
        src=exchange.client_id,
        dest=self.agent_id,
        label=self.handle_id,
        body=ShutdownRequest(terminate=terminate),
    )
    self._shutdown_requests.add(request.tag)
    await self.exchange.send(request)
    logger.debug(
        'Sent shutdown request from %s to %s',
        exchange.client_id,
        self.agent_id,
    )

ProxyHandle

ProxyHandle(agent: AgentT)

Bases: Handle[AgentT]

Proxy handle.

A proxy handle is thin wrapper around an Agent instance that is useful for testing agents that are initialized with a handle to another agent without needing to spawn agents. This wrapper invokes actions synchronously.

Source code in academy/handle.py
def __init__(self, agent: AgentT) -> None:
    self.agent = agent
    self.agent_id: AgentId[AgentT] = AgentId.new()
    self._agent_closed = False

exchange property

exchange: ExchangeClient[Any]

Exchange client used to send messages.

Returns:

Raises:

action async

action(action: str, /, *args: Any, **kwargs: Any) -> R

Invoke an action on the agent.

Parameters:

  • action (str) –

    Action to invoke.

  • args (Any, default: () ) –

    Positional arguments for the action.

  • kwargs (Any, default: {} ) –

    Keywords arguments for the action.

Returns:

  • R

    Result of the action.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • Exception

    Any exception raised by the action.

Source code in academy/handle.py
async def action(self, action: str, /, *args: Any, **kwargs: Any) -> R:
    """Invoke an action on the agent.

    Args:
        action: Action to invoke.
        args: Positional arguments for the action.
        kwargs: Keywords arguments for the action.

    Returns:
        Result of the action.

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        Exception: Any exception raised by the action.
    """
    if self._agent_closed:
        raise AgentTerminatedError(self.agent_id)

    method = getattr(self.agent, action)
    return await method(*args, **kwargs)

ping async

ping(*, timeout: float | None = None) -> float

Ping the agent.

This is a no-op for proxy handles and returns 0 latency.

Parameters:

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for the response.

Returns:

  • float

    Round-trip time in seconds.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

  • TimeoutError

    If the timeout is exceeded.

Source code in academy/handle.py
async def ping(self, *, timeout: float | None = None) -> float:
    """Ping the agent.

    This is a no-op for proxy handles and returns 0 latency.

    Args:
        timeout: Optional timeout in seconds to wait for the response.

    Returns:
        Round-trip time in seconds.

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
        TimeoutError: If the timeout is exceeded.
    """
    if self._agent_closed:
        raise AgentTerminatedError(self.agent_id)
    return 0

shutdown async

shutdown(*, terminate: bool | None = None) -> None

Instruct the agent to shutdown.

This is non-blocking and will only send the message.

Parameters:

  • terminate (bool | None, default: None ) –

    Override the termination behavior of the agent defined in the RuntimeConfig.

Raises:

  • AgentTerminatedError

    If the agent's mailbox was closed. This typically indicates the agent shutdown for another reason (it self terminated or via another handle).

Source code in academy/handle.py
async def shutdown(self, *, terminate: bool | None = None) -> None:
    """Instruct the agent to shutdown.

    This is non-blocking and will only send the message.

    Args:
        terminate: Override the termination behavior of the agent defined
            in the [`RuntimeConfig`][academy.runtime.RuntimeConfig].

    Raises:
        AgentTerminatedError: If the agent's mailbox was closed. This
            typically indicates the agent shutdown for another reason
            (it self terminated or via another handle).
    """
    if self._agent_closed:
        raise AgentTerminatedError(self.agent_id)
    self._agent_closed = True if terminate is None else terminate