Skip to content

academy.exchange.proxystore

ProxyStoreExchangeTransport

ProxyStoreExchangeTransport(
    transport: ExchangeTransportT,
    store: Store[Any],
    should_proxy: Callable[[Any], bool],
    *,
    resolve_async: bool = False
)

Bases: ExchangeTransportMixin, NoPickleMixin, Generic[ExchangeTransportT]

ProxyStore exchange client bound to a specific mailbox.

Source code in academy/exchange/proxystore.py
def __init__(
    self,
    transport: ExchangeTransportT,
    store: Store[Any],
    should_proxy: Callable[[Any], bool],
    *,
    resolve_async: bool = False,
) -> None:
    self.transport = transport
    self.store = store
    self.should_proxy = should_proxy
    self.resolve_async = resolve_async
    register_store(store, exist_ok=True)

ProxyStoreExchangeFactory

ProxyStoreExchangeFactory(
    base: ExchangeFactory[ExchangeTransportT],
    store: Store[Any] | None,
    should_proxy: Callable[[Any], bool],
    *,
    resolve_async: bool = False
)

Bases: ExchangeFactory[ProxyStoreExchangeTransport[ExchangeTransportT]]

ProxStore exchange client factory.

A ProxyStore exchange is used to wrap an underlying exchange so large objects may be passed by reference.

Parameters:

  • base (ExchangeFactory[ExchangeTransportT]) –

    Base exchange factory.

  • store (Store[Any] | None) –

    Store to use for proxying data.

  • should_proxy (Callable[[Any], bool]) –

    A callable that returns True if an object should be proxied. This is applied to every positional and keyword argument and result value.

  • resolve_async (bool, default: False ) –

    Resolve proxies asynchronously when received.

Source code in academy/exchange/proxystore.py
def __init__(
    self,
    base: ExchangeFactory[ExchangeTransportT],
    store: Store[Any] | None,
    should_proxy: Callable[[Any], bool],
    *,
    resolve_async: bool = False,
) -> None:
    self.base = base
    self.store = store
    self.should_proxy = should_proxy
    self.resolve_async = resolve_async

create_agent_client async

create_agent_client(
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler,
) -> AgentExchangeClient[AgentT, ExchangeTransportT]

Create a new agent exchange client.

An agent must be registered with the exchange before an exchange client can be created. For example:

factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)

Parameters:

  • registration (AgentRegistration[AgentT]) –

    Registration information returned by the exchange.

  • request_handler (RequestHandler) –

    Agent request message handler.

Returns:

Raises:

  • BadEntityIdError

    If an agent with registration.agent_id is not already registered with the exchange.

Source code in academy/exchange/__init__.py
async def create_agent_client(
    self,
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler,
) -> AgentExchangeClient[AgentT, ExchangeTransportT]:
    """Create a new agent exchange client.

    An agent must be registered with the exchange before an exchange
    client can be created. For example:
    ```python
    factory = ExchangeFactory(...)
    user_client = factory.create_user_client()
    registration = user_client.register_agent(...)
    agent_client = factory.create_agent_client(registration, ...)
    ```

    Args:
        registration: Registration information returned by the exchange.
        request_handler: Agent request message handler.

    Returns:
        Agent exchange client.

    Raises:
        BadEntityIdError: If an agent with `registration.agent_id` is not
            already registered with the exchange.
    """
    agent_id: AgentId[AgentT] = registration.agent_id
    transport = await self._create_transport(
        mailbox_id=agent_id,
        registration=registration,
    )
    assert transport.mailbox_id == agent_id
    status = await transport.status(agent_id)
    if status != MailboxStatus.ACTIVE:
        await transport.close()
        raise BadEntityIdError(agent_id)
    return AgentExchangeClient(
        agent_id,
        transport,
        request_handler=request_handler,
    )

create_user_client async

create_user_client(
    *, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]

Create a new user in the exchange and associated client.

Parameters:

  • name (str | None, default: None ) –

    Display name of the client on the exchange.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Returns:

Source code in academy/exchange/__init__.py
async def create_user_client(
    self,
    *,
    name: str | None = None,
    start_listener: bool = True,
) -> UserExchangeClient[ExchangeTransportT]:
    """Create a new user in the exchange and associated client.

    Args:
        name: Display name of the client on the exchange.
        start_listener: Start a message listener thread.

    Returns:
        User exchange client.
    """
    transport = await self._create_transport(mailbox_id=None, name=name)
    user_id = transport.mailbox_id
    assert isinstance(user_id, UserId)
    return UserExchangeClient(
        user_id,
        transport,
        start_listener=start_listener,
    )