Skip to content

academy.exchange.proxystore

ProxyStoreExchange

ProxyStoreExchange(
    exchange: Exchange,
    store: Store[Any],
    should_proxy: Callable[[Any], bool],
    *,
    resolve_async: bool = False
)

Bases: ExchangeMixin

Wrap an Exchange with ProxyStore support.

Sending large action payloads via the exchange can result in considerable slowdowns. This Exchange wrapper can replace arguments in action requests and results in action responses with proxies to reduce communication costs.

Parameters:

  • exchange (Exchange) –

    Exchange to wrap.

  • store (Store[Any]) –

    Store to use for proxying data.

  • should_proxy (Callable[[Any], bool]) –

    A callable that returns True if an object should be proxied. This is applied to every positional and keyword argument and result value.

  • resolve_async (bool, default: False ) –

    Resolve proxies asynchronously when received.

Source code in academy/exchange/proxystore.py
def __init__(
    self,
    exchange: Exchange,
    store: Store[Any],
    should_proxy: Callable[[Any], bool],
    *,
    resolve_async: bool = False,
) -> None:
    self.exchange = exchange
    self.store = store
    self.should_proxy = should_proxy
    self.resolve_async = resolve_async
    register_store(store, exist_ok=True)

close

close() -> None

Close the exchange client.

Note

This does not alter the state of the exchange.

Source code in academy/exchange/proxystore.py
def close(self) -> None:
    """Close the exchange client.

    Note:
        This does not alter the state of the exchange.
    """
    self.exchange.close()

register_agent

register_agent(
    behavior: type[BehaviorT],
    *,
    agent_id: AgentId[BehaviorT] | None = None,
    name: str | None = None
) -> AgentId[BehaviorT]

Create a new agent identifier and associated mailbox.

Parameters:

  • behavior (type[BehaviorT]) –

    Type of the behavior this agent will implement.

  • agent_id (AgentId[BehaviorT] | None, default: None ) –

    Specify the ID of the agent. Randomly generated default.

  • name (str | None, default: None ) –

    Optional human-readable name for the agent. Ignored if agent_id is provided.

Returns:

  • AgentId[BehaviorT]

    Unique identifier for the agent's mailbox.

Source code in academy/exchange/proxystore.py
def register_agent(
    self,
    behavior: type[BehaviorT],
    *,
    agent_id: AgentId[BehaviorT] | None = None,
    name: str | None = None,
) -> AgentId[BehaviorT]:
    """Create a new agent identifier and associated mailbox.

    Args:
        behavior: Type of the behavior this agent will implement.
        agent_id: Specify the ID of the agent. Randomly generated
            default.
        name: Optional human-readable name for the agent. Ignored if
            `agent_id` is provided.

    Returns:
        Unique identifier for the agent's mailbox.
    """
    return self.exchange.register_agent(
        behavior,
        agent_id=agent_id,
        name=name,
    )

register_client

register_client(*, name: str | None = None) -> ClientId

Create a new client identifier and associated mailbox.

Parameters:

  • name (str | None, default: None ) –

    Optional human-readable name for the client.

Returns:

  • ClientId

    Unique identifier for the client's mailbox.

Source code in academy/exchange/proxystore.py
def register_client(
    self,
    *,
    name: str | None = None,
) -> ClientId:
    """Create a new client identifier and associated mailbox.

    Args:
        name: Optional human-readable name for the client.

    Returns:
        Unique identifier for the client's mailbox.
    """
    return self.exchange.register_client(name=name)

terminate

terminate(uid: EntityId) -> None

Close the mailbox for an entity from the exchange.

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Source code in academy/exchange/proxystore.py
def terminate(self, uid: EntityId) -> None:
    """Close the mailbox for an entity from the exchange.

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.
    """
    self.exchange.terminate(uid)

discover

discover(
    behavior: type[Behavior],
    *,
    allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given behavior.

Parameters:

  • behavior (type[Behavior]) –

    Behavior type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the behavior.

Returns:

  • tuple[AgentId[Any], ...]

    Tuple of agent IDs implementing the behavior.

Source code in academy/exchange/proxystore.py
def discover(
    self,
    behavior: type[Behavior],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given behavior.

    Args:
        behavior: Behavior type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            behavior.

    Returns:
        Tuple of agent IDs implementing the behavior.
    """
    return self.exchange.discover(
        behavior,
        allow_subclasses=allow_subclasses,
    )

get_mailbox

get_mailbox(uid: EntityId) -> Mailbox

Get a client to a specific mailbox.

Parameters:

  • uid (EntityId) –

    EntityId of the mailbox.

Returns:

Raises:

  • BadEntityIdError

    if a mailbox for uid does not exist.

Source code in academy/exchange/proxystore.py
def get_mailbox(self, uid: EntityId) -> Mailbox:
    """Get a client to a specific mailbox.

    Args:
        uid: EntityId of the mailbox.

    Returns:
        Mailbox client.

    Raises:
        BadEntityIdError: if a mailbox for `uid` does not exist.
    """
    base_mailbox = self.exchange.get_mailbox(uid)
    return ProxyStoreMailbox(base_mailbox, self, self.resolve_async)

send

send(uid: EntityId, message: Message) -> None

Send a message to a mailbox.

Parameters:

  • uid (EntityId) –

    Destination address of the message.

  • message (Message) –

    Message to send.

Raises:

  • BadEntityIdError

    if a mailbox for uid does not exist.

  • MailboxClosedError

    if the mailbox was closed.

Source code in academy/exchange/proxystore.py
def send(self, uid: EntityId, message: Message) -> None:
    """Send a message to a mailbox.

    Args:
        uid: Destination address of the message.
        message: Message to send.

    Raises:
        BadEntityIdError: if a mailbox for `uid` does not exist.
        MailboxClosedError: if the mailbox was closed.
    """
    if isinstance(message, ActionRequest):
        message.pargs = _proxy_iterable(
            message.pargs,
            self.store,
            self.should_proxy,
        )
        message.kargs = _proxy_mapping(
            message.kargs,
            self.store,
            self.should_proxy,
        )
    if isinstance(message, ActionResponse) and message.result is not None:
        message.result = _proxy_item(
            message.result,
            self.store,
            self.should_proxy,
        )

    self.exchange.send(uid, message)

get_handle

get_handle(
    aid: AgentId[BehaviorT],
) -> UnboundRemoteHandle[BehaviorT]

Create a new handle to an agent.

A handle enables a client to invoke actions on the agent.

Note

It is not possible to create a handle to a client since a handle is essentially a new client of a specific agent.

Parameters:

  • aid (AgentId[BehaviorT]) –

    EntityId of the agent to create an handle to.

Returns:

Raises:

Source code in academy/exchange/__init__.py
def get_handle(
    self: Exchange,
    aid: AgentId[BehaviorT],
) -> UnboundRemoteHandle[BehaviorT]:
    """Create a new handle to an agent.

    A handle enables a client to invoke actions on the agent.

    Note:
        It is not possible to create a handle to a client since a handle
        is essentially a new client of a specific agent.

    Args:
        aid: EntityId of the agent to create an handle to.

    Returns:
        Handle to the agent.

    Raises:
        TypeError: if `aid` is not an instance of
            [`AgentId`][academy.identifier.AgentId].
    """
    if not isinstance(aid, AgentId):
        raise TypeError(
            f'Handle must be created from an {AgentId.__name__} '
            f'but got identifier with type {type(aid).__name__}.',
        )
    return UnboundRemoteHandle(self, aid)

ProxyStoreMailbox

ProxyStoreMailbox(
    mailbox: Mailbox,
    exchange: ProxyStoreExchange,
    resolve_async: bool = False,
)

Bases: NoPickleMixin

Client protocol that listens to incoming messages to a mailbox.

Parameters:

  • mailbox (Mailbox) –

    The mailbox created by the wrapped exchange.

  • exchange (ProxyStoreExchange) –

    The wrapper exchange.

  • resolve_async (bool, default: False ) –

    Begin resolving proxies in action requests or responses asynchronously once the message is received.

Source code in academy/exchange/proxystore.py
def __init__(
    self,
    mailbox: Mailbox,
    exchange: ProxyStoreExchange,
    resolve_async: bool = False,
) -> None:
    self._exchange = exchange
    self._mailbox = mailbox
    self._resolve_async = resolve_async

exchange property

exchange: Exchange

Exchange client.

mailbox_id property

mailbox_id: EntityId

Mailbox address/identifier.

close

close() -> None

Close this mailbox client.

Warning

This does not close the mailbox in the exchange. I.e., the exchange will still accept new messages to this mailbox, but this client will no longer be listening for them.

Source code in academy/exchange/proxystore.py
def close(self) -> None:
    """Close this mailbox client.

    Warning:
        This does not close the mailbox in the exchange. I.e., the exchange
        will still accept new messages to this mailbox, but this client
        will no longer be listening for them.
    """
    self._mailbox.close()

recv

recv(timeout: float | None = None) -> Message

Receive the next message in the mailbox.

This blocks until the next message is received or the mailbox is closed.

Parameters:

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for the next message. If None, the default, block forever until the next message or the mailbox is closed.

Raises:

  • MailboxClosedError

    if the mailbox was closed.

  • TimeoutError

    if a timeout was specified and exceeded.

Source code in academy/exchange/proxystore.py
def recv(self, timeout: float | None = None) -> Message:
    """Receive the next message in the mailbox.

    This blocks until the next message is received or the mailbox
    is closed.

    Args:
        timeout: Optional timeout in seconds to wait for the next
            message. If `None`, the default, block forever until the
            next message or the mailbox is closed.

    Raises:
        MailboxClosedError: if the mailbox was closed.
        TimeoutError: if a `timeout` was specified and exceeded.
    """
    message = self._mailbox.recv(timeout)
    if self._resolve_async and isinstance(message, ActionRequest):
        for arg in (*message.pargs, *message.kargs.values()):
            if type(arg) is Proxy:
                resolve_async(arg)
    elif (
        self._resolve_async
        and isinstance(message, ActionResponse)
        and type(message.result) is Proxy
    ):
        resolve_async(message.result)
    return message