Skip to content

academy.exchange

AgentExchangeClient

AgentExchangeClient(
    agent_id: AgentId[AgentT],
    transport: ExchangeTransportT,
    request_handler: RequestHandler[RequestT_co],
)

Bases: ExchangeClient[ExchangeTransportT], Generic[AgentT, ExchangeTransportT]

Agent exchange client.

Warning

Agent exchange clients should only be created via ExchangeFactory.create_agent_client()!

Parameters:

  • agent_id (AgentId[AgentT]) –

    Agent ID.

  • transport (ExchangeTransportT) –

    Exchange transport bound to agent_id.

  • request_handler (RequestHandler[RequestT_co]) –

    Request handler of the agent that will be called for each message received to this agent's mailbox. start_listener: Start a message listener thread.

Source code in academy/exchange/client.py
def __init__(
    self,
    agent_id: AgentId[AgentT],
    transport: ExchangeTransportT,
    request_handler: RequestHandler[RequestT_co],
) -> None:
    super().__init__(transport)
    self._agent_id = agent_id
    self._request_handler = request_handler

client_id property

client_id: AgentId[AgentT]

Agent ID of the client.

discover async

discover(
    agent: type[Agent], *, allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given agent.

Parameters:

  • agent (type[Agent]) –

    Agent type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the agent.

Returns:

Source code in academy/exchange/client.py
async def discover(
    self,
    agent: type[Agent],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given agent.

    Args:
        agent: Agent type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            agent.

    Returns:
        Tuple of agent IDs implementing the agent.
    """
    return await self._transport.discover(
        agent,
        allow_subclasses=allow_subclasses,
    )

factory

Get an exchange factory.

Source code in academy/exchange/client.py
def factory(self) -> ExchangeFactory[ExchangeTransportT]:
    """Get an exchange factory."""
    return self._transport.factory()

register_handle

register_handle(handle: Handle[AgentT]) -> None

Register an existing handle to receive messages.

Parameters:

Source code in academy/exchange/client.py
def register_handle(self, handle: Handle[AgentT]) -> None:
    """Register an existing handle to receive messages.

    Args:
        handle: Handle to register.
    """
    self._handles[handle.handle_id] = handle

register_agent async

register_agent(
    agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]

Register a new agent and associated mailbox with the exchange.

Parameters:

  • agent (type[AgentT]) –

    Agent type of the agent.

  • name (str | None, default: None ) –

    Optional display name for the agent.

Returns:

Source code in academy/exchange/client.py
async def register_agent(
    self,
    agent: type[AgentT],
    *,
    name: str | None = None,
) -> AgentRegistration[AgentT]:
    """Register a new agent and associated mailbox with the exchange.

    Args:
        agent: Agent type of the agent.
        name: Optional display name for the agent.

    Returns:
        Agent registration info.
    """
    registration = await self._transport.register_agent(
        agent,
        name=name,
    )
    logger.info('Registered %s in exchange', registration.agent_id)
    return registration

send async

send(message: Message[Any]) -> None

Send a message to a mailbox.

Parameters:

Raises:

  • BadEntityIdError

    If a mailbox for message.dest does not exist.

  • MailboxTerminatedError

    If the mailbox was closed.

Source code in academy/exchange/client.py
async def send(self, message: Message[Any]) -> None:
    """Send a message to a mailbox.

    Args:
        message: Message to send.

    Raises:
        BadEntityIdError: If a mailbox for `message.dest` does not exist.
        MailboxTerminatedError: If the mailbox was closed.
    """
    await self._transport.send(message)
    logger.debug('Sent %s to %s', type(message).__name__, message.dest)

status async

status(uid: EntityId) -> MailboxStatus

Check the status of a mailbox in the exchange.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to check.

Source code in academy/exchange/client.py
async def status(self, uid: EntityId) -> MailboxStatus:
    """Check the status of a mailbox in the exchange.

    Args:
        uid: Entity identifier of the mailbox to check.
    """
    return await self._transport.status(uid)

terminate async

terminate(uid: EntityId) -> None

Terminate a mailbox in the exchange.

Terminating a mailbox means that the corresponding entity will no longer be able to receive messages.

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Source code in academy/exchange/client.py
async def terminate(self, uid: EntityId) -> None:
    """Terminate a mailbox in the exchange.

    Terminating a mailbox means that the corresponding entity will no
    longer be able to receive messages.

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.
    """
    await self._transport.terminate(uid)

close async

close() -> None

Close the user client.

This closes the underlying exchange transport and all handles created by this client. The agent's mailbox will not be terminated so the agent can be started again later.

Source code in academy/exchange/client.py
async def close(self) -> None:
    """Close the user client.

    This closes the underlying exchange transport and all handles created
    by this client. The agent's mailbox will not be terminated so the agent
    can be started again later.
    """
    async with self._close_lock:
        if self._closed:
            return

        await self._transport.close()
        self._closed = True
        logger.info('Closed exchange client for %s', self.client_id)

ExchangeClient

ExchangeClient(transport: ExchangeTransportT)

Bases: ABC, Generic[ExchangeTransportT]

Base exchange client.

Warning

Exchange clients should only be created via ExchangeFactory.create_agent_client() or ExchangeFactory.create_user_client()!

Parameters:

Source code in academy/exchange/client.py
def __init__(
    self,
    transport: ExchangeTransportT,
) -> None:
    self._transport = transport
    self._handles: WeakValueDictionary[uuid.UUID, Handle[Any]] = (
        WeakValueDictionary()
    )
    self._close_lock = asyncio.Lock()
    self._closed = False

client_id abstractmethod property

client_id: EntityId

Client ID as registered with the exchange.

close abstractmethod async

close() -> None

Close the transport.

Source code in academy/exchange/client.py
@abc.abstractmethod
async def close(self) -> None:
    """Close the transport."""
    ...

discover async

discover(
    agent: type[Agent], *, allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given agent.

Parameters:

  • agent (type[Agent]) –

    Agent type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the agent.

Returns:

Source code in academy/exchange/client.py
async def discover(
    self,
    agent: type[Agent],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given agent.

    Args:
        agent: Agent type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            agent.

    Returns:
        Tuple of agent IDs implementing the agent.
    """
    return await self._transport.discover(
        agent,
        allow_subclasses=allow_subclasses,
    )

factory

Get an exchange factory.

Source code in academy/exchange/client.py
def factory(self) -> ExchangeFactory[ExchangeTransportT]:
    """Get an exchange factory."""
    return self._transport.factory()

register_handle

register_handle(handle: Handle[AgentT]) -> None

Register an existing handle to receive messages.

Parameters:

Source code in academy/exchange/client.py
def register_handle(self, handle: Handle[AgentT]) -> None:
    """Register an existing handle to receive messages.

    Args:
        handle: Handle to register.
    """
    self._handles[handle.handle_id] = handle

register_agent async

register_agent(
    agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]

Register a new agent and associated mailbox with the exchange.

Parameters:

  • agent (type[AgentT]) –

    Agent type of the agent.

  • name (str | None, default: None ) –

    Optional display name for the agent.

Returns:

Source code in academy/exchange/client.py
async def register_agent(
    self,
    agent: type[AgentT],
    *,
    name: str | None = None,
) -> AgentRegistration[AgentT]:
    """Register a new agent and associated mailbox with the exchange.

    Args:
        agent: Agent type of the agent.
        name: Optional display name for the agent.

    Returns:
        Agent registration info.
    """
    registration = await self._transport.register_agent(
        agent,
        name=name,
    )
    logger.info('Registered %s in exchange', registration.agent_id)
    return registration

send async

send(message: Message[Any]) -> None

Send a message to a mailbox.

Parameters:

Raises:

  • BadEntityIdError

    If a mailbox for message.dest does not exist.

  • MailboxTerminatedError

    If the mailbox was closed.

Source code in academy/exchange/client.py
async def send(self, message: Message[Any]) -> None:
    """Send a message to a mailbox.

    Args:
        message: Message to send.

    Raises:
        BadEntityIdError: If a mailbox for `message.dest` does not exist.
        MailboxTerminatedError: If the mailbox was closed.
    """
    await self._transport.send(message)
    logger.debug('Sent %s to %s', type(message).__name__, message.dest)

status async

status(uid: EntityId) -> MailboxStatus

Check the status of a mailbox in the exchange.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to check.

Source code in academy/exchange/client.py
async def status(self, uid: EntityId) -> MailboxStatus:
    """Check the status of a mailbox in the exchange.

    Args:
        uid: Entity identifier of the mailbox to check.
    """
    return await self._transport.status(uid)

terminate async

terminate(uid: EntityId) -> None

Terminate a mailbox in the exchange.

Terminating a mailbox means that the corresponding entity will no longer be able to receive messages.

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Source code in academy/exchange/client.py
async def terminate(self, uid: EntityId) -> None:
    """Terminate a mailbox in the exchange.

    Terminating a mailbox means that the corresponding entity will no
    longer be able to receive messages.

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.
    """
    await self._transport.terminate(uid)

UserExchangeClient

UserExchangeClient(
    user_id: UserId,
    transport: ExchangeTransportT,
    *,
    start_listener: bool = True
)

Bases: ExchangeClient[ExchangeTransportT]

User exchange client.

Warning

User exchange clients should only be created via ExchangeFactory.create_user_client()!

Parameters:

  • user_id (UserId) –

    User ID.

  • transport (ExchangeTransportT) –

    Exchange transport bound to user_id.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Source code in academy/exchange/client.py
def __init__(
    self,
    user_id: UserId,
    transport: ExchangeTransportT,
    *,
    start_listener: bool = True,
) -> None:
    super().__init__(transport)
    self._user_id = user_id
    self._listener_task: asyncio.Task[None] | None = None
    if start_listener:
        self._listener_task = asyncio.create_task(
            self._listen_for_messages(),
            name=f'user-exchange-listener-{self.client_id}',
        )

client_id property

client_id: UserId

User ID of the client.

discover async

discover(
    agent: type[Agent], *, allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given agent.

Parameters:

  • agent (type[Agent]) –

    Agent type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the agent.

Returns:

Source code in academy/exchange/client.py
async def discover(
    self,
    agent: type[Agent],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given agent.

    Args:
        agent: Agent type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            agent.

    Returns:
        Tuple of agent IDs implementing the agent.
    """
    return await self._transport.discover(
        agent,
        allow_subclasses=allow_subclasses,
    )

factory

Get an exchange factory.

Source code in academy/exchange/client.py
def factory(self) -> ExchangeFactory[ExchangeTransportT]:
    """Get an exchange factory."""
    return self._transport.factory()

register_handle

register_handle(handle: Handle[AgentT]) -> None

Register an existing handle to receive messages.

Parameters:

Source code in academy/exchange/client.py
def register_handle(self, handle: Handle[AgentT]) -> None:
    """Register an existing handle to receive messages.

    Args:
        handle: Handle to register.
    """
    self._handles[handle.handle_id] = handle

register_agent async

register_agent(
    agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]

Register a new agent and associated mailbox with the exchange.

Parameters:

  • agent (type[AgentT]) –

    Agent type of the agent.

  • name (str | None, default: None ) –

    Optional display name for the agent.

Returns:

Source code in academy/exchange/client.py
async def register_agent(
    self,
    agent: type[AgentT],
    *,
    name: str | None = None,
) -> AgentRegistration[AgentT]:
    """Register a new agent and associated mailbox with the exchange.

    Args:
        agent: Agent type of the agent.
        name: Optional display name for the agent.

    Returns:
        Agent registration info.
    """
    registration = await self._transport.register_agent(
        agent,
        name=name,
    )
    logger.info('Registered %s in exchange', registration.agent_id)
    return registration

send async

send(message: Message[Any]) -> None

Send a message to a mailbox.

Parameters:

Raises:

  • BadEntityIdError

    If a mailbox for message.dest does not exist.

  • MailboxTerminatedError

    If the mailbox was closed.

Source code in academy/exchange/client.py
async def send(self, message: Message[Any]) -> None:
    """Send a message to a mailbox.

    Args:
        message: Message to send.

    Raises:
        BadEntityIdError: If a mailbox for `message.dest` does not exist.
        MailboxTerminatedError: If the mailbox was closed.
    """
    await self._transport.send(message)
    logger.debug('Sent %s to %s', type(message).__name__, message.dest)

status async

status(uid: EntityId) -> MailboxStatus

Check the status of a mailbox in the exchange.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to check.

Source code in academy/exchange/client.py
async def status(self, uid: EntityId) -> MailboxStatus:
    """Check the status of a mailbox in the exchange.

    Args:
        uid: Entity identifier of the mailbox to check.
    """
    return await self._transport.status(uid)

terminate async

terminate(uid: EntityId) -> None

Terminate a mailbox in the exchange.

Terminating a mailbox means that the corresponding entity will no longer be able to receive messages.

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Source code in academy/exchange/client.py
async def terminate(self, uid: EntityId) -> None:
    """Terminate a mailbox in the exchange.

    Terminating a mailbox means that the corresponding entity will no
    longer be able to receive messages.

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.
    """
    await self._transport.terminate(uid)

close async

close() -> None

Close the user client.

This terminates the user's mailbox, closes the underlying exchange transport.

Source code in academy/exchange/client.py
async def close(self) -> None:
    """Close the user client.

    This terminates the user's mailbox, closes the underlying exchange
    transport.
    """
    async with self._close_lock:
        if self._closed:
            return

        await self._transport.terminate(self.client_id)
        logger.info(f'Terminated mailbox for {self.client_id}')
        await self._stop_listener_task()
        await self._transport.close()
        self._closed = True
        logger.info('Closed exchange client for %s', self.client_id)

HttpExchangeFactory

HttpExchangeFactory(
    url: str,
    auth_method: Literal["globus"] | None = None,
    additional_headers: dict[str, str] | None = None,
    ssl_verify: bool | None = None,
)

Bases: ExchangeFactory[HttpExchangeTransport]

Http exchange client factory.

Parameters:

  • url (str) –

    Address of HTTP exchange

  • auth_method (Literal['globus'] | None, default: None ) –

    Method to get authorization headers

  • additional_headers (dict[str, str] | None, default: None ) –

    Any other information necessary to communicate with the exchange. Used for passing the Globus bearer token

  • ssl_verify (bool | None, default: None ) –

    Same as requests.Session.verify. If the server's TLS certificate should be validated. Should be true if using HTTPS Only set to false for testing or local development.

Source code in academy/exchange/cloud/client.py
def __init__(
    self,
    url: str,
    auth_method: Literal['globus'] | None = None,
    additional_headers: dict[str, str] | None = None,
    ssl_verify: bool | None = None,
) -> None:
    if additional_headers is None:
        additional_headers = {}
    additional_headers |= get_auth_headers(auth_method)

    self._info = _HttpConnectionInfo(
        url=url,
        additional_headers=additional_headers,
        ssl_verify=ssl_verify,
    )

create_agent_client async

create_agent_client(
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]

Create a new agent exchange client.

An agent must be registered with the exchange before an exchange client can be created. For example:

factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)

Parameters:

  • registration (AgentRegistration[AgentT]) –

    Registration information returned by the exchange.

  • request_handler (RequestHandler[RequestT_co]) –

    Agent request message handler.

Returns:

Raises:

  • BadEntityIdError

    If an agent with registration.agent_id is not already registered with the exchange.

Source code in academy/exchange/factory.py
async def create_agent_client(
    self,
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]:
    """Create a new agent exchange client.

    An agent must be registered with the exchange before an exchange
    client can be created. For example:
    ```python
    factory = ExchangeFactory(...)
    user_client = factory.create_user_client()
    registration = user_client.register_agent(...)
    agent_client = factory.create_agent_client(registration, ...)
    ```

    Args:
        registration: Registration information returned by the exchange.
        request_handler: Agent request message handler.

    Returns:
        Agent exchange client.

    Raises:
        BadEntityIdError: If an agent with `registration.agent_id` is not
            already registered with the exchange.
    """
    agent_id: AgentId[AgentT] = registration.agent_id
    transport = await self._create_transport(
        mailbox_id=agent_id,
        registration=registration,
    )
    assert transport.mailbox_id == agent_id
    status = await transport.status(agent_id)
    if status != MailboxStatus.ACTIVE:
        await transport.close()
        raise BadEntityIdError(agent_id)
    return AgentExchangeClient(
        agent_id,
        transport,
        request_handler=request_handler,
    )

create_user_client async

create_user_client(
    *, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]

Create a new user in the exchange and associated client.

Parameters:

  • name (str | None, default: None ) –

    Display name of the client on the exchange.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Returns:

Source code in academy/exchange/factory.py
async def create_user_client(
    self,
    *,
    name: str | None = None,
    start_listener: bool = True,
) -> UserExchangeClient[ExchangeTransportT]:
    """Create a new user in the exchange and associated client.

    Args:
        name: Display name of the client on the exchange.
        start_listener: Start a message listener thread.

    Returns:
        User exchange client.
    """
    transport = await self._create_transport(mailbox_id=None, name=name)
    user_id = transport.mailbox_id
    assert isinstance(user_id, UserId)
    return UserExchangeClient(
        user_id,
        transport,
        start_listener=start_listener,
    )

HttpExchangeTransport

HttpExchangeTransport(
    mailbox_id: EntityId,
    session: ClientSession,
    connection_info: _HttpConnectionInfo,
)

Bases: ExchangeTransportMixin, NoPickleMixin

Http exchange client.

Parameters:

  • mailbox_id (EntityId) –

    Identifier of the mailbox on the exchange. If there is not an id provided, the exchange will create a new client mailbox.

  • session (ClientSession) –

    Http session.

  • connection_info (_HttpConnectionInfo) –

    Exchange connection info.

Source code in academy/exchange/cloud/client.py
def __init__(
    self,
    mailbox_id: EntityId,
    session: aiohttp.ClientSession,
    connection_info: _HttpConnectionInfo,
) -> None:
    self._mailbox_id = mailbox_id
    self._session = session
    self._info = connection_info

    base_url = self._info.url
    self._mailbox_url = f'{base_url}/mailbox'
    self._message_url = f'{base_url}/message'
    self._discover_url = f'{base_url}/discover'

new async classmethod

new(
    *,
    connection_info: _HttpConnectionInfo,
    mailbox_id: EntityId | None = None,
    name: str | None = None
) -> Self

Instantiate a new transport.

Parameters:

  • connection_info (_HttpConnectionInfo) –

    Exchange connection information.

  • mailbox_id (EntityId | None, default: None ) –

    Bind the transport to the specific mailbox. If None, a new user entity will be registered and the transport will be bound to that mailbox.

  • name (str | None, default: None ) –

    Display name of the registered entity if mailbox_id is None.

Returns:

  • Self

    An instantiated transport bound to a specific mailbox.

Source code in academy/exchange/cloud/client.py
@classmethod
async def new(
    cls,
    *,
    connection_info: _HttpConnectionInfo,
    mailbox_id: EntityId | None = None,
    name: str | None = None,
) -> Self:
    """Instantiate a new transport.

    Args:
        connection_info: Exchange connection information.
        mailbox_id: Bind the transport to the specific mailbox. If `None`,
            a new user entity will be registered and the transport will be
            bound to that mailbox.
        name: Display name of the registered entity if `mailbox_id` is
            `None`.

    Returns:
        An instantiated transport bound to a specific mailbox.
    """
    ssl_verify = connection_info.ssl_verify
    if ssl_verify is None:  # pragma: no branch
        scheme = urlparse(connection_info.url).scheme
        ssl_verify = scheme == 'https'

    session = aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(ssl=ssl_verify),
        headers=connection_info.additional_headers,
    )

    if mailbox_id is None:
        mailbox_id = UserId.new(name=name)
        async with session.post(
            f'{connection_info.url}/mailbox',
            json={'mailbox': mailbox_id.model_dump_json()},
        ) as response:
            _raise_for_status(response, mailbox_id)
        logger.info('Registered %s in exchange', mailbox_id)

    return cls(mailbox_id, session, connection_info)

ExchangeFactory

Bases: ABC, Generic[ExchangeTransportT]

Exchange client factory.

An exchange factory is used to mint new exchange clients for users and agents, encapsulating the complexities of instantiating the underlying communication classes (the ExchangeTransport).

Warning

Factory implementations must be efficiently pickleable because factory instances are shared between user and agent processes so that all entities can create clients to the same exchange.

create_agent_client async

create_agent_client(
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]

Create a new agent exchange client.

An agent must be registered with the exchange before an exchange client can be created. For example:

factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)

Parameters:

  • registration (AgentRegistration[AgentT]) –

    Registration information returned by the exchange.

  • request_handler (RequestHandler[RequestT_co]) –

    Agent request message handler.

Returns:

Raises:

  • BadEntityIdError

    If an agent with registration.agent_id is not already registered with the exchange.

Source code in academy/exchange/factory.py
async def create_agent_client(
    self,
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]:
    """Create a new agent exchange client.

    An agent must be registered with the exchange before an exchange
    client can be created. For example:
    ```python
    factory = ExchangeFactory(...)
    user_client = factory.create_user_client()
    registration = user_client.register_agent(...)
    agent_client = factory.create_agent_client(registration, ...)
    ```

    Args:
        registration: Registration information returned by the exchange.
        request_handler: Agent request message handler.

    Returns:
        Agent exchange client.

    Raises:
        BadEntityIdError: If an agent with `registration.agent_id` is not
            already registered with the exchange.
    """
    agent_id: AgentId[AgentT] = registration.agent_id
    transport = await self._create_transport(
        mailbox_id=agent_id,
        registration=registration,
    )
    assert transport.mailbox_id == agent_id
    status = await transport.status(agent_id)
    if status != MailboxStatus.ACTIVE:
        await transport.close()
        raise BadEntityIdError(agent_id)
    return AgentExchangeClient(
        agent_id,
        transport,
        request_handler=request_handler,
    )

create_user_client async

create_user_client(
    *, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]

Create a new user in the exchange and associated client.

Parameters:

  • name (str | None, default: None ) –

    Display name of the client on the exchange.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Returns:

Source code in academy/exchange/factory.py
async def create_user_client(
    self,
    *,
    name: str | None = None,
    start_listener: bool = True,
) -> UserExchangeClient[ExchangeTransportT]:
    """Create a new user in the exchange and associated client.

    Args:
        name: Display name of the client on the exchange.
        start_listener: Start a message listener thread.

    Returns:
        User exchange client.
    """
    transport = await self._create_transport(mailbox_id=None, name=name)
    user_id = transport.mailbox_id
    assert isinstance(user_id, UserId)
    return UserExchangeClient(
        user_id,
        transport,
        start_listener=start_listener,
    )

HybridExchangeFactory

HybridExchangeFactory(
    redis_host: str,
    redis_port: int,
    *,
    redis_kwargs: dict[str, Any] | None = None,
    interface: str | None = None,
    namespace: str | None = "default",
    ports: Iterable[int] | None = None
)

Bases: ExchangeFactory[HybridExchangeTransport]

Hybrid exchange client factory.

The hybrid exchange uses peer-to-peer communication via TCP and a central Redis server for mailbox state and queueing messages for offline entities.

Parameters:

  • redis_host (str) –

    Redis server hostname.

  • redis_port (int) –

    Redis server port.

  • redis_kwargs (dict[str, Any] | None, default: None ) –

    Extra keyword arguments to pass to redis.Redis().

  • interface (str | None, default: None ) –

    Network interface use for peer-to-peer communication. If None, the hostname of the local host is used.

  • namespace (str | None, default: 'default' ) –

    Redis key namespace. If None a random key prefix is generated.

  • ports (Iterable[int] | None, default: None ) –

    An iterable of ports to give each client a unique port from a user defined set. A StopIteration exception will be raised in create_*_client() methods if the number of clients in the process is greater than the length of the iterable.

Source code in academy/exchange/hybrid.py
def __init__(  # noqa: PLR0913
    self,
    redis_host: str,
    redis_port: int,
    *,
    redis_kwargs: dict[str, Any] | None = None,
    interface: str | None = None,
    namespace: str | None = 'default',
    ports: Iterable[int] | None = None,
) -> None:
    self._namespace = (
        namespace
        if namespace is not None
        else uuid_to_base32(uuid.uuid4())
    )
    self._interface = interface
    self._redis_info = _RedisConnectionInfo(
        redis_host,
        redis_port,
        redis_kwargs if redis_kwargs is not None else {},
    )
    self._ports = None if ports is None else iter(ports)

create_agent_client async

create_agent_client(
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]

Create a new agent exchange client.

An agent must be registered with the exchange before an exchange client can be created. For example:

factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)

Parameters:

  • registration (AgentRegistration[AgentT]) –

    Registration information returned by the exchange.

  • request_handler (RequestHandler[RequestT_co]) –

    Agent request message handler.

Returns:

Raises:

  • BadEntityIdError

    If an agent with registration.agent_id is not already registered with the exchange.

Source code in academy/exchange/factory.py
async def create_agent_client(
    self,
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]:
    """Create a new agent exchange client.

    An agent must be registered with the exchange before an exchange
    client can be created. For example:
    ```python
    factory = ExchangeFactory(...)
    user_client = factory.create_user_client()
    registration = user_client.register_agent(...)
    agent_client = factory.create_agent_client(registration, ...)
    ```

    Args:
        registration: Registration information returned by the exchange.
        request_handler: Agent request message handler.

    Returns:
        Agent exchange client.

    Raises:
        BadEntityIdError: If an agent with `registration.agent_id` is not
            already registered with the exchange.
    """
    agent_id: AgentId[AgentT] = registration.agent_id
    transport = await self._create_transport(
        mailbox_id=agent_id,
        registration=registration,
    )
    assert transport.mailbox_id == agent_id
    status = await transport.status(agent_id)
    if status != MailboxStatus.ACTIVE:
        await transport.close()
        raise BadEntityIdError(agent_id)
    return AgentExchangeClient(
        agent_id,
        transport,
        request_handler=request_handler,
    )

create_user_client async

create_user_client(
    *, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]

Create a new user in the exchange and associated client.

Parameters:

  • name (str | None, default: None ) –

    Display name of the client on the exchange.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Returns:

Source code in academy/exchange/factory.py
async def create_user_client(
    self,
    *,
    name: str | None = None,
    start_listener: bool = True,
) -> UserExchangeClient[ExchangeTransportT]:
    """Create a new user in the exchange and associated client.

    Args:
        name: Display name of the client on the exchange.
        start_listener: Start a message listener thread.

    Returns:
        User exchange client.
    """
    transport = await self._create_transport(mailbox_id=None, name=name)
    user_id = transport.mailbox_id
    assert isinstance(user_id, UserId)
    return UserExchangeClient(
        user_id,
        transport,
        start_listener=start_listener,
    )

HybridExchangeTransport

HybridExchangeTransport(
    mailbox_id: EntityId,
    redis_client: Redis,
    *,
    redis_info: _RedisConnectionInfo,
    namespace: str,
    host: str,
    port: int,
    interface: str | None = None
)

Bases: ExchangeTransportMixin, NoPickleMixin

Hybrid exchange transport bound to a specific mailbox.

Source code in academy/exchange/hybrid.py
def __init__(  # noqa: PLR0913
    self,
    mailbox_id: EntityId,
    redis_client: redis.asyncio.Redis,
    *,
    redis_info: _RedisConnectionInfo,
    namespace: str,
    host: str,
    port: int,
    interface: str | None = None,
) -> None:
    self._mailbox_id = mailbox_id
    self._redis_client = redis_client
    self._redis_info = redis_info
    self._namespace = namespace
    self._host = host
    self._port = port
    self._interface = interface

    self._address_cache: dict[EntityId, str] = {}
    if sys.version_info >= (3, 13):  # pragma: >=3.13 cover
        self._messages: AsyncQueue[Message[Any]] = Queue()
    else:  # pragma: <3.13 cover
        self._messages: AsyncQueue[Message[Any]] = Queue().async_q
    self._socket_pool = SocketPool()
    self._started = asyncio.Event()
    self._shutdown = asyncio.Event()

    self._server = SimpleSocketServer(
        handler=self._direct_message_handler,
        host=host,
        port=port,
    )
    self._server_task = asyncio.create_task(
        self._run_direct_server(),
        name=f'hybrid-transport-direct-server-{self.mailbox_id}',
    )
    self._redis_task = asyncio.create_task(
        self._run_redis_listener(),
        name=f'hybrid-transport-redis-watcher-{self.mailbox_id}',
    )

new async classmethod

new(
    *,
    namespace: str,
    redis_info: _RedisConnectionInfo,
    interface: str | None = None,
    mailbox_id: EntityId | None = None,
    name: str | None = None,
    port: int | None = None
) -> Self

Instantiate a new transport.

Parameters:

  • namespace (str) –

    Redis key namespace.

  • redis_info (_RedisConnectionInfo) –

    Redis connection information.

  • interface (str | None, default: None ) –

    Network interface use for peer-to-peer communication. If None, the hostname of the local host is used.

  • mailbox_id (EntityId | None, default: None ) –

    Bind the transport to the specific mailbox. If None, a new user entity will be registered and the transport will be bound to that mailbox.

  • name (str | None, default: None ) –

    Display name of the registered entity if mailbox_id is None.

  • port (int | None, default: None ) –

    Port to listen for peer connection on.

Returns:

  • Self

    An instantiated transport bound to a specific mailbox.

Raises:

Source code in academy/exchange/hybrid.py
@classmethod
async def new(  # noqa: PLR0913
    cls,
    *,
    namespace: str,
    redis_info: _RedisConnectionInfo,
    interface: str | None = None,
    mailbox_id: EntityId | None = None,
    name: str | None = None,
    port: int | None = None,
) -> Self:
    """Instantiate a new transport.

    Args:
        namespace: Redis key namespace.
        redis_info: Redis connection information.
        interface: Network interface use for peer-to-peer communication.
            If `None`, the hostname of the local host is used.
        mailbox_id: Bind the transport to the specific mailbox. If `None`,
            a new user entity will be registered and the transport will be
            bound to that mailbox.
        name: Display name of the registered entity if `mailbox_id` is
            `None`.
        port: Port to listen for peer connection on.

    Returns:
        An instantiated transport bound to a specific mailbox.

    Raises:
        redis.exceptions.ConnectionError: If the Redis server is not
            reachable.
    """
    host = (
        address_by_interface(interface)
        if interface is not None
        else address_by_hostname()
    )
    port = port if port is not None else open_port()

    client = redis.asyncio.Redis(
        host=redis_info.hostname,
        port=redis_info.port,
        decode_responses=False,
        **redis_info.kwargs,
    )
    # Ensure the redis server is reachable else fail early
    await client.ping()

    if mailbox_id is None:
        mailbox_id = UserId.new(name=name)
        await client.set(
            f'{namespace}:status:{uuid_to_base32(mailbox_id.uid)}',
            _MailboxState.ACTIVE.value,
        )
        logger.info('Registered %s in exchange', mailbox_id)

    await client.set(
        f'{namespace}:address:{uuid_to_base32(mailbox_id.uid)}',
        f'{host}:{port}',
    )

    transport = cls(
        mailbox_id,
        client,
        redis_info=redis_info,
        namespace=namespace,
        interface=interface,
        host=host,
        port=port,
    )
    # Wait for the direct message server to start
    await asyncio.wait_for(transport._started.wait(), timeout=5)
    return transport

LocalExchangeFactory

LocalExchangeFactory(
    *, _state: _LocalExchangeState | None = None
)

Bases: ExchangeFactory[LocalExchangeTransport], NoPickleMixin

Local exchange client factory.

A thread exchange can be used to pass messages between agents running in separate threads of a single process.

Source code in academy/exchange/local.py
def __init__(
    self,
    *,
    _state: _LocalExchangeState | None = None,
):
    self._state = _LocalExchangeState() if _state is None else _state

create_agent_client async

create_agent_client(
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]

Create a new agent exchange client.

An agent must be registered with the exchange before an exchange client can be created. For example:

factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)

Parameters:

  • registration (AgentRegistration[AgentT]) –

    Registration information returned by the exchange.

  • request_handler (RequestHandler[RequestT_co]) –

    Agent request message handler.

Returns:

Raises:

  • BadEntityIdError

    If an agent with registration.agent_id is not already registered with the exchange.

Source code in academy/exchange/factory.py
async def create_agent_client(
    self,
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]:
    """Create a new agent exchange client.

    An agent must be registered with the exchange before an exchange
    client can be created. For example:
    ```python
    factory = ExchangeFactory(...)
    user_client = factory.create_user_client()
    registration = user_client.register_agent(...)
    agent_client = factory.create_agent_client(registration, ...)
    ```

    Args:
        registration: Registration information returned by the exchange.
        request_handler: Agent request message handler.

    Returns:
        Agent exchange client.

    Raises:
        BadEntityIdError: If an agent with `registration.agent_id` is not
            already registered with the exchange.
    """
    agent_id: AgentId[AgentT] = registration.agent_id
    transport = await self._create_transport(
        mailbox_id=agent_id,
        registration=registration,
    )
    assert transport.mailbox_id == agent_id
    status = await transport.status(agent_id)
    if status != MailboxStatus.ACTIVE:
        await transport.close()
        raise BadEntityIdError(agent_id)
    return AgentExchangeClient(
        agent_id,
        transport,
        request_handler=request_handler,
    )

create_user_client async

create_user_client(
    *, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]

Create a new user in the exchange and associated client.

Parameters:

  • name (str | None, default: None ) –

    Display name of the client on the exchange.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Returns:

Source code in academy/exchange/factory.py
async def create_user_client(
    self,
    *,
    name: str | None = None,
    start_listener: bool = True,
) -> UserExchangeClient[ExchangeTransportT]:
    """Create a new user in the exchange and associated client.

    Args:
        name: Display name of the client on the exchange.
        start_listener: Start a message listener thread.

    Returns:
        User exchange client.
    """
    transport = await self._create_transport(mailbox_id=None, name=name)
    user_id = transport.mailbox_id
    assert isinstance(user_id, UserId)
    return UserExchangeClient(
        user_id,
        transport,
        start_listener=start_listener,
    )

LocalExchangeTransport

LocalExchangeTransport(
    mailbox_id: EntityId, state: _LocalExchangeState
)

Bases: ExchangeTransportMixin, NoPickleMixin

Local exchange client bound to a specific mailbox.

Source code in academy/exchange/local.py
def __init__(
    self,
    mailbox_id: EntityId,
    state: _LocalExchangeState,
) -> None:
    self._mailbox_id = mailbox_id
    self._state = state

new classmethod

new(
    mailbox_id: EntityId | None = None,
    *,
    name: str | None = None,
    state: _LocalExchangeState
) -> Self

Instantiate a new transport.

Parameters:

  • mailbox_id (EntityId | None, default: None ) –

    Bind the transport to the specific mailbox. If None, a new user entity will be registered and the transport will be bound to that mailbox.

  • name (str | None, default: None ) –

    Display name of the redistered entity if mailbox_id is None.

  • state (_LocalExchangeState) –

    Shared state among exchange clients.

Returns:

  • Self

    An instantiated transport bound to a specific mailbox.

Source code in academy/exchange/local.py
@classmethod
def new(
    cls,
    mailbox_id: EntityId | None = None,
    *,
    name: str | None = None,
    state: _LocalExchangeState,
) -> Self:
    """Instantiate a new transport.

    Args:
        mailbox_id: Bind the transport to the specific mailbox. If `None`,
            a new user entity will be registered and the transport will be
            bound to that mailbox.
        name: Display name of the redistered entity if `mailbox_id` is
            `None`.
        state: Shared state among exchange clients.

    Returns:
        An instantiated transport bound to a specific mailbox.
    """
    if mailbox_id is None:
        mailbox_id = UserId.new(name=name)
        state.queues[mailbox_id] = Queue().async_q
        state.locks[mailbox_id] = Lock()
        logger.info('Registered %s in exchange', mailbox_id)
    return cls(mailbox_id, state)

ProxyStoreExchangeFactory

ProxyStoreExchangeFactory(
    base: ExchangeFactory[ExchangeTransportT],
    store: Store[Any] | None,
    should_proxy: Callable[[Any], bool],
    *,
    resolve_async: bool = False
)

Bases: ExchangeFactory[ProxyStoreExchangeTransport[ExchangeTransportT]]

ProxStore exchange client factory.

A ProxyStore exchange is used to wrap an underlying exchange so large objects may be passed by reference.

Parameters:

  • base (ExchangeFactory[ExchangeTransportT]) –

    Base exchange factory.

  • store (Store[Any] | None) –

    Store to use for proxying data.

  • should_proxy (Callable[[Any], bool]) –

    A callable that returns True if an object should be proxied. This is applied to every positional and keyword argument and result value.

  • resolve_async (bool, default: False ) –

    Resolve proxies asynchronously when received.

Source code in academy/exchange/proxystore.py
def __init__(
    self,
    base: ExchangeFactory[ExchangeTransportT],
    store: Store[Any] | None,
    should_proxy: Callable[[Any], bool],
    *,
    resolve_async: bool = False,
) -> None:
    self.base = base
    self.store = store
    self.should_proxy = should_proxy
    self.resolve_async = resolve_async

create_agent_client async

create_agent_client(
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]

Create a new agent exchange client.

An agent must be registered with the exchange before an exchange client can be created. For example:

factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)

Parameters:

  • registration (AgentRegistration[AgentT]) –

    Registration information returned by the exchange.

  • request_handler (RequestHandler[RequestT_co]) –

    Agent request message handler.

Returns:

Raises:

  • BadEntityIdError

    If an agent with registration.agent_id is not already registered with the exchange.

Source code in academy/exchange/factory.py
async def create_agent_client(
    self,
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]:
    """Create a new agent exchange client.

    An agent must be registered with the exchange before an exchange
    client can be created. For example:
    ```python
    factory = ExchangeFactory(...)
    user_client = factory.create_user_client()
    registration = user_client.register_agent(...)
    agent_client = factory.create_agent_client(registration, ...)
    ```

    Args:
        registration: Registration information returned by the exchange.
        request_handler: Agent request message handler.

    Returns:
        Agent exchange client.

    Raises:
        BadEntityIdError: If an agent with `registration.agent_id` is not
            already registered with the exchange.
    """
    agent_id: AgentId[AgentT] = registration.agent_id
    transport = await self._create_transport(
        mailbox_id=agent_id,
        registration=registration,
    )
    assert transport.mailbox_id == agent_id
    status = await transport.status(agent_id)
    if status != MailboxStatus.ACTIVE:
        await transport.close()
        raise BadEntityIdError(agent_id)
    return AgentExchangeClient(
        agent_id,
        transport,
        request_handler=request_handler,
    )

create_user_client async

create_user_client(
    *, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]

Create a new user in the exchange and associated client.

Parameters:

  • name (str | None, default: None ) –

    Display name of the client on the exchange.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Returns:

Source code in academy/exchange/factory.py
async def create_user_client(
    self,
    *,
    name: str | None = None,
    start_listener: bool = True,
) -> UserExchangeClient[ExchangeTransportT]:
    """Create a new user in the exchange and associated client.

    Args:
        name: Display name of the client on the exchange.
        start_listener: Start a message listener thread.

    Returns:
        User exchange client.
    """
    transport = await self._create_transport(mailbox_id=None, name=name)
    user_id = transport.mailbox_id
    assert isinstance(user_id, UserId)
    return UserExchangeClient(
        user_id,
        transport,
        start_listener=start_listener,
    )

ProxyStoreExchangeTransport

ProxyStoreExchangeTransport(
    transport: ExchangeTransportT,
    store: Store[Any],
    should_proxy: Callable[[Any], bool],
    *,
    resolve_async: bool = False
)

Bases: ExchangeTransportMixin, NoPickleMixin, Generic[ExchangeTransportT]

ProxyStore exchange client bound to a specific mailbox.

Source code in academy/exchange/proxystore.py
def __init__(
    self,
    transport: ExchangeTransportT,
    store: Store[Any],
    should_proxy: Callable[[Any], bool],
    *,
    resolve_async: bool = False,
) -> None:
    self.transport = transport
    self.store = store
    self.should_proxy = should_proxy
    self.resolve_async = resolve_async
    register_store(store, exist_ok=True)

RedisExchangeFactory

RedisExchangeFactory(
    hostname: str, port: int, **redis_kwargs: Any
)

Bases: ExchangeFactory[RedisExchangeTransport]

Redis exchange client factory.

Parameters:

  • hostname (str) –

    Redis server hostname.

  • port (int) –

    Redis server port.

  • redis_kwargs (Any, default: {} ) –

    Extra keyword arguments to pass to redis.Redis().

Source code in academy/exchange/redis.py
def __init__(
    self,
    hostname: str,
    port: int,
    **redis_kwargs: Any,
) -> None:
    self.redis_info = _RedisConnectionInfo(hostname, port, redis_kwargs)

create_agent_client async

create_agent_client(
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]

Create a new agent exchange client.

An agent must be registered with the exchange before an exchange client can be created. For example:

factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)

Parameters:

  • registration (AgentRegistration[AgentT]) –

    Registration information returned by the exchange.

  • request_handler (RequestHandler[RequestT_co]) –

    Agent request message handler.

Returns:

Raises:

  • BadEntityIdError

    If an agent with registration.agent_id is not already registered with the exchange.

Source code in academy/exchange/factory.py
async def create_agent_client(
    self,
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]:
    """Create a new agent exchange client.

    An agent must be registered with the exchange before an exchange
    client can be created. For example:
    ```python
    factory = ExchangeFactory(...)
    user_client = factory.create_user_client()
    registration = user_client.register_agent(...)
    agent_client = factory.create_agent_client(registration, ...)
    ```

    Args:
        registration: Registration information returned by the exchange.
        request_handler: Agent request message handler.

    Returns:
        Agent exchange client.

    Raises:
        BadEntityIdError: If an agent with `registration.agent_id` is not
            already registered with the exchange.
    """
    agent_id: AgentId[AgentT] = registration.agent_id
    transport = await self._create_transport(
        mailbox_id=agent_id,
        registration=registration,
    )
    assert transport.mailbox_id == agent_id
    status = await transport.status(agent_id)
    if status != MailboxStatus.ACTIVE:
        await transport.close()
        raise BadEntityIdError(agent_id)
    return AgentExchangeClient(
        agent_id,
        transport,
        request_handler=request_handler,
    )

create_user_client async

create_user_client(
    *, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]

Create a new user in the exchange and associated client.

Parameters:

  • name (str | None, default: None ) –

    Display name of the client on the exchange.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Returns:

Source code in academy/exchange/factory.py
async def create_user_client(
    self,
    *,
    name: str | None = None,
    start_listener: bool = True,
) -> UserExchangeClient[ExchangeTransportT]:
    """Create a new user in the exchange and associated client.

    Args:
        name: Display name of the client on the exchange.
        start_listener: Start a message listener thread.

    Returns:
        User exchange client.
    """
    transport = await self._create_transport(mailbox_id=None, name=name)
    user_id = transport.mailbox_id
    assert isinstance(user_id, UserId)
    return UserExchangeClient(
        user_id,
        transport,
        start_listener=start_listener,
    )

RedisExchangeTransport

RedisExchangeTransport(
    mailbox_id: EntityId,
    redis_client: Redis,
    *,
    redis_info: _RedisConnectionInfo
)

Bases: ExchangeTransportMixin, NoPickleMixin

Redis exchange transport bound to a specific mailbox.

Source code in academy/exchange/redis.py
def __init__(
    self,
    mailbox_id: EntityId,
    redis_client: redis.asyncio.Redis,
    *,
    redis_info: _RedisConnectionInfo,
) -> None:
    self._mailbox_id = mailbox_id
    self._client = redis_client
    self._redis_info = redis_info

new async classmethod

new(
    *,
    mailbox_id: EntityId | None = None,
    name: str | None = None,
    redis_info: _RedisConnectionInfo
) -> Self

Instantiate a new transport.

Parameters:

  • mailbox_id (EntityId | None, default: None ) –

    Bind the transport to the specific mailbox. If None, a new user entity will be registered and the transport will be bound to that mailbox.

  • name (str | None, default: None ) –

    Display name of the redistered entity if mailbox_id is None.

  • redis_info (_RedisConnectionInfo) –

    Redis connection information.

Returns:

  • Self

    An instantiated transport bound to a specific mailbox.

Raises:

Source code in academy/exchange/redis.py
@classmethod
async def new(
    cls,
    *,
    mailbox_id: EntityId | None = None,
    name: str | None = None,
    redis_info: _RedisConnectionInfo,
) -> Self:
    """Instantiate a new transport.

    Args:
        mailbox_id: Bind the transport to the specific mailbox. If `None`,
            a new user entity will be registered and the transport will be
            bound to that mailbox.
        name: Display name of the redistered entity if `mailbox_id` is
            `None`.
        redis_info: Redis connection information.

    Returns:
        An instantiated transport bound to a specific mailbox.

    Raises:
        redis.exceptions.ConnectionError: If the Redis server is not
            reachable.
    """
    client = redis.asyncio.Redis(
        host=redis_info.hostname,
        port=redis_info.port,
        decode_responses=False,
        **redis_info.kwargs,
    )
    # Ensure the redis server is reachable else fail early
    await client.ping()

    if mailbox_id is None:
        mailbox_id = UserId.new(name=name)
        await client.set(
            f'active:{mailbox_id.uid}',
            _MailboxState.ACTIVE.value,
        )
        logger.info('Registered %s in exchange', mailbox_id)
    return cls(mailbox_id, client, redis_info=redis_info)

ExchangeTransport

Bases: Protocol[AgentRegistrationT_co]

Low-level exchange communicator.

A message exchange hosts mailboxes for each entity (i.e., agent or user) in a multi-agent system. This transport protocol defines mechanisms for entity management (e.g., registration, discovery, status, termination) and for sending/receiving messages from a mailbox. As such, each transport instance is "bound" to a specific mailbox in the exchange.

Warning

A specific exchange transport should not be replicated because multiple client instances receiving from the same mailbox produces undefined agent.

mailbox_id property

mailbox_id: EntityId

ID of the mailbox this client is bound to.

close async

close() -> None

Close the exchange client.

Note

This does not alter the state of the mailbox this client is bound to. I.e., the mailbox will not be terminated.

Source code in academy/exchange/transport.py
async def close(self) -> None:
    """Close the exchange client.

    Note:
        This does not alter the state of the mailbox this client is bound
        to. I.e., the mailbox will not be terminated.
    """
    ...

discover async

discover(
    agent: type[Agent], *, allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given agent.

Warning

Implementations of this method are often O(n) and scan the types of all agents registered to the exchange.

Parameters:

  • agent (type[Agent]) –

    Agent type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the agent.

Returns:

Raises:

  • ExchangeError

    Error returned by the exchange.

Source code in academy/exchange/transport.py
async def discover(
    self,
    agent: type[Agent],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given agent.

    Warning:
        Implementations of this method are often O(n) and scan the types
        of all agents registered to the exchange.

    Args:
        agent: Agent type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            agent.

    Returns:
        Tuple of agent IDs implementing the agent.

    Raises:
        ExchangeError: Error returned by the exchange.
    """
    ...

factory

factory() -> ExchangeFactory[Self]

Get an exchange factory.

Source code in academy/exchange/transport.py
def factory(self) -> ExchangeFactory[Self]:
    """Get an exchange factory."""
    ...

recv async

recv(timeout: float | None = None) -> Message[Any]

Receive the next message sent to the mailbox.

This blocks until the next message is received, there is a timeout, or the mailbox is terminated.

Parameters:

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for the next message. If None, the default, block forever until the next message or the mailbox is closed.

Raises:

Source code in academy/exchange/transport.py
async def recv(self, timeout: float | None = None) -> Message[Any]:
    """Receive the next message sent to the mailbox.

    This blocks until the next message is received, there is a timeout, or
    the mailbox is terminated.

    Args:
        timeout: Optional timeout in seconds to wait for the next
            message. If `None`, the default, block forever until the
            next message or the mailbox is closed.

    Raises:
        MailboxTerminatedError: If the mailbox was closed.
        ExchangeError: Error returned by the exchange.
        TimeoutError: If a `timeout` was specified and exceeded.
    """
    ...

register_agent async

register_agent(
    agent: type[AgentT], *, name: str | None = None
) -> AgentRegistrationT_co

Register a new agent and associated mailbox with the exchange.

Parameters:

  • agent (type[AgentT]) –

    Agent type of the agent.

  • name (str | None, default: None ) –

    Optional display name for the agent.

Returns:

  • AgentRegistrationT_co

    Agent registration info.

Raises:

  • ExchangeError

    Error returned by the exchange.

Source code in academy/exchange/transport.py
async def register_agent(
    self,
    agent: type[AgentT],
    *,
    name: str | None = None,
) -> AgentRegistrationT_co:
    """Register a new agent and associated mailbox with the exchange.

    Args:
        agent: Agent type of the agent.
        name: Optional display name for the agent.

    Returns:
        Agent registration info.

    Raises:
        ExchangeError: Error returned by the exchange.
    """
    ...

send async

send(message: Message[Any]) -> None

Send a message to a mailbox.

Parameters:

Raises:

  • BadEntityIdError

    If a mailbox for message.dest does not exist.

  • MailboxTerminatedError

    If the mailbox was closed.

  • ExchangeError

    Error returned by the exchange.

Source code in academy/exchange/transport.py
async def send(self, message: Message[Any]) -> None:
    """Send a message to a mailbox.

    Args:
        message: Message to send.

    Raises:
        BadEntityIdError: If a mailbox for `message.dest` does not exist.
        MailboxTerminatedError: If the mailbox was closed.
        ExchangeError: Error returned by the exchange.
    """
    ...

status async

status(uid: EntityId) -> MailboxStatus

Check the status of a mailbox in the exchange.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to check.

Raises:

  • ExchangeError

    Error returned by the exchange.

Source code in academy/exchange/transport.py
async def status(self, uid: EntityId) -> MailboxStatus:
    """Check the status of a mailbox in the exchange.

    Args:
        uid: Entity identifier of the mailbox to check.

    Raises:
        ExchangeError: Error returned by the exchange.
    """
    ...

terminate async

terminate(uid: EntityId) -> None

Terminate a mailbox in the exchange.

Once an entity's mailbox is terminated:

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Raises:

  • ExchangeError

    Error returned by the exchange.

Source code in academy/exchange/transport.py
async def terminate(self, uid: EntityId) -> None:
    """Terminate a mailbox in the exchange.

    Once an entity's mailbox is terminated:

    * All request messages in the mailbox will be replied to with a
      [`MailboxTerminatedError`][academy.exception.MailboxTerminatedError].
    * All calls to
      [`recv()`][academy.exchange.transport.ExchangeTransport.recv]
      will raise a
      [`MailboxTerminatedError`][academy.exception.MailboxTerminatedError].
    * All attempts to
      [`send()`][academy.exchange.transport.ExchangeTransport.send]
      to this mailbox by other entities will raise a
      [`MailboxTerminatedError`][academy.exception.MailboxTerminatedError].

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.

    Raises:
        ExchangeError: Error returned by the exchange.
    """
    ...

MailboxStatus

Bases: Enum

Exchange mailbox status.

MISSING class-attribute instance-attribute

MISSING = 'MISSING'

Mailbox does not exist.

ACTIVE class-attribute instance-attribute

ACTIVE = 'ACTIVE'

Mailbox exists and is accepting messages.

TERMINATED class-attribute instance-attribute

TERMINATED = 'TERMINATED'

Mailbox was terminated and no longer accepts messages.