Skip to content

academy.exchange

ExchangeFactory

Bases: ABC, Generic[ExchangeTransportT]

Exchange client factory.

An exchange factory is used to mint new exchange clients for users and agents, encapsulating the complexities of instantiating the underlying communication classes (the ExchangeTransport).

Warning

Factory implementations must be efficiently pickleable because factory instances are shared between user and agent processes so that all entities can create clients to the same exchange.

create_agent_client async

create_agent_client(
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler,
) -> AgentExchangeClient[AgentT, ExchangeTransportT]

Create a new agent exchange client.

An agent must be registered with the exchange before an exchange client can be created. For example:

factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)

Parameters:

  • registration (AgentRegistration[AgentT]) –

    Registration information returned by the exchange.

  • request_handler (RequestHandler) –

    Agent request message handler.

Returns:

Raises:

  • BadEntityIdError

    If an agent with registration.agent_id is not already registered with the exchange.

Source code in academy/exchange/__init__.py
async def create_agent_client(
    self,
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler,
) -> AgentExchangeClient[AgentT, ExchangeTransportT]:
    """Create a new agent exchange client.

    An agent must be registered with the exchange before an exchange
    client can be created. For example:
    ```python
    factory = ExchangeFactory(...)
    user_client = factory.create_user_client()
    registration = user_client.register_agent(...)
    agent_client = factory.create_agent_client(registration, ...)
    ```

    Args:
        registration: Registration information returned by the exchange.
        request_handler: Agent request message handler.

    Returns:
        Agent exchange client.

    Raises:
        BadEntityIdError: If an agent with `registration.agent_id` is not
            already registered with the exchange.
    """
    agent_id: AgentId[AgentT] = registration.agent_id
    transport = await self._create_transport(
        mailbox_id=agent_id,
        registration=registration,
    )
    assert transport.mailbox_id == agent_id
    status = await transport.status(agent_id)
    if status != MailboxStatus.ACTIVE:
        await transport.close()
        raise BadEntityIdError(agent_id)
    return AgentExchangeClient(
        agent_id,
        transport,
        request_handler=request_handler,
    )

create_user_client async

create_user_client(
    *, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]

Create a new user in the exchange and associated client.

Parameters:

  • name (str | None, default: None ) –

    Display name of the client on the exchange.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Returns:

Source code in academy/exchange/__init__.py
async def create_user_client(
    self,
    *,
    name: str | None = None,
    start_listener: bool = True,
) -> UserExchangeClient[ExchangeTransportT]:
    """Create a new user in the exchange and associated client.

    Args:
        name: Display name of the client on the exchange.
        start_listener: Start a message listener thread.

    Returns:
        User exchange client.
    """
    transport = await self._create_transport(mailbox_id=None, name=name)
    user_id = transport.mailbox_id
    assert isinstance(user_id, UserId)
    return UserExchangeClient(
        user_id,
        transport,
        start_listener=start_listener,
    )

ExchangeClient

ExchangeClient(transport: ExchangeTransportT)

Bases: ABC, Generic[ExchangeTransportT]

Base exchange client.

Warning

Exchange clients should only be created via ExchangeFactory.create_agent_client() or ExchangeFactory.create_user_client()!

Parameters:

Source code in academy/exchange/__init__.py
def __init__(
    self,
    transport: ExchangeTransportT,
) -> None:
    self._transport = transport
    self._handles: dict[uuid.UUID, RemoteHandle[Any]] = {}
    self._close_lock = asyncio.Lock()
    self._closed = False

client_id abstractmethod property

client_id: EntityId

Client ID as registered with the exchange.

close abstractmethod async

close() -> None

Close the transport.

Source code in academy/exchange/__init__.py
@abc.abstractmethod
async def close(self) -> None:
    """Close the transport."""
    ...

discover async

discover(
    agent: type[Agent], *, allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given agent.

Parameters:

  • agent (type[Agent]) –

    Agent type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the agent.

Returns:

Source code in academy/exchange/__init__.py
async def discover(
    self,
    agent: type[Agent],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given agent.

    Args:
        agent: Agent type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            agent.

    Returns:
        Tuple of agent IDs implementing the agent.
    """
    return await self._transport.discover(
        agent,
        allow_subclasses=allow_subclasses,
    )

factory

Get an exchange factory.

Source code in academy/exchange/__init__.py
def factory(self) -> ExchangeFactory[ExchangeTransportT]:
    """Get an exchange factory."""
    return self._transport.factory()

get_handle

get_handle(aid: AgentId[AgentT]) -> RemoteHandle[AgentT]

Create a new handle to an agent.

A handle acts like a reference to a remote agent, enabling a user to manage the agent or asynchronously invoke actions.

Parameters:

  • aid (AgentId[AgentT]) –

    Agent to create an handle to. The agent must be registered with the same exchange.

Returns:

Raises:

Source code in academy/exchange/__init__.py
def get_handle(self, aid: AgentId[AgentT]) -> RemoteHandle[AgentT]:
    """Create a new handle to an agent.

    A handle acts like a reference to a remote agent, enabling a user
    to manage the agent or asynchronously invoke actions.

    Args:
        aid: Agent to create an handle to. The agent must be registered
            with the same exchange.

    Returns:
        Handle to the agent.

    Raises:
        TypeError: if `aid` is not an instance of
            [`AgentId`][academy.identifier.AgentId].
    """
    if not isinstance(aid, AgentId):
        raise TypeError(
            f'Handle must be created from an {AgentId.__name__} '
            f'but got identifier with type {type(aid).__name__}.',
        )
    handle = RemoteHandle(self, aid)
    self._handles[handle.handle_id] = handle
    logger.info('Created handle to %s', aid)
    return handle

register_agent async

register_agent(
    agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]

Register a new agent and associated mailbox with the exchange.

Parameters:

  • agent (type[AgentT]) –

    Agent type of the agent.

  • name (str | None, default: None ) –

    Optional display name for the agent.

Returns:

Source code in academy/exchange/__init__.py
async def register_agent(
    self,
    agent: type[AgentT],
    *,
    name: str | None = None,
) -> AgentRegistration[AgentT]:
    """Register a new agent and associated mailbox with the exchange.

    Args:
        agent: Agent type of the agent.
        name: Optional display name for the agent.

    Returns:
        Agent registration info.
    """
    registration = await self._transport.register_agent(
        agent,
        name=name,
    )
    logger.info('Registered %s in exchange', registration.agent_id)
    return registration

send async

send(message: Message) -> None

Send a message to a mailbox.

Parameters:

  • message (Message) –

    Message to send.

Raises:

Source code in academy/exchange/__init__.py
async def send(self, message: Message) -> None:
    """Send a message to a mailbox.

    Args:
        message: Message to send.

    Raises:
        BadEntityIdError: If a mailbox for `message.dest` does not exist.
        MailboxTerminatedError: If the mailbox was closed.
    """
    await self._transport.send(message)
    logger.debug('Sent %s to %s', type(message).__name__, message.dest)

status async

status(uid: EntityId) -> MailboxStatus

Check the status of a mailbox in the exchange.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to check.

Source code in academy/exchange/__init__.py
async def status(self, uid: EntityId) -> MailboxStatus:
    """Check the status of a mailbox in the exchange.

    Args:
        uid: Entity identifier of the mailbox to check.
    """
    return await self._transport.status(uid)

terminate async

terminate(uid: EntityId) -> None

Terminate a mailbox in the exchange.

Terminating a mailbox means that the corresponding entity will no longer be able to receive messages.

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Source code in academy/exchange/__init__.py
async def terminate(self, uid: EntityId) -> None:
    """Terminate a mailbox in the exchange.

    Terminating a mailbox means that the corresponding entity will no
    longer be able to receive messages.

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.
    """
    await self._transport.terminate(uid)

AgentExchangeClient

AgentExchangeClient(
    agent_id: AgentId[AgentT],
    transport: ExchangeTransportT,
    request_handler: RequestHandler,
)

Bases: ExchangeClient[ExchangeTransportT], Generic[AgentT, ExchangeTransportT]

Agent exchange client.

Warning

Agent exchange clients should only be created via ExchangeFactory.create_agent_client()!

Parameters:

  • agent_id (AgentId[AgentT]) –

    Agent ID.

  • transport (ExchangeTransportT) –

    Exchange transport bound to agent_id.

  • request_handler (RequestHandler) –

    Request handler of the agent that will be called for each message received to this agent's mailbox. start_listener: Start a message listener thread.

Source code in academy/exchange/__init__.py
def __init__(
    self,
    agent_id: AgentId[AgentT],
    transport: ExchangeTransportT,
    request_handler: RequestHandler,
) -> None:
    super().__init__(transport)
    self._agent_id = agent_id
    self._request_handler = request_handler

client_id property

client_id: AgentId[AgentT]

Agent ID of the client.

close async

close() -> None

Close the user client.

This closes the underlying exchange transport and all handles created by this client. The agent's mailbox will not be terminated so the agent can be started again later.

Source code in academy/exchange/__init__.py
async def close(self) -> None:
    """Close the user client.

    This closes the underlying exchange transport and all handles created
    by this client. The agent's mailbox will not be terminated so the agent
    can be started again later.
    """
    async with self._close_lock:
        if self._closed:
            return

        await self._close_handles()
        await self._transport.close()
        self._closed = True
        logger.info('Closed exchange client for %s', self.client_id)

discover async

discover(
    agent: type[Agent], *, allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given agent.

Parameters:

  • agent (type[Agent]) –

    Agent type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the agent.

Returns:

Source code in academy/exchange/__init__.py
async def discover(
    self,
    agent: type[Agent],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given agent.

    Args:
        agent: Agent type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            agent.

    Returns:
        Tuple of agent IDs implementing the agent.
    """
    return await self._transport.discover(
        agent,
        allow_subclasses=allow_subclasses,
    )

factory

Get an exchange factory.

Source code in academy/exchange/__init__.py
def factory(self) -> ExchangeFactory[ExchangeTransportT]:
    """Get an exchange factory."""
    return self._transport.factory()

get_handle

get_handle(aid: AgentId[AgentT]) -> RemoteHandle[AgentT]

Create a new handle to an agent.

A handle acts like a reference to a remote agent, enabling a user to manage the agent or asynchronously invoke actions.

Parameters:

  • aid (AgentId[AgentT]) –

    Agent to create an handle to. The agent must be registered with the same exchange.

Returns:

Raises:

Source code in academy/exchange/__init__.py
def get_handle(self, aid: AgentId[AgentT]) -> RemoteHandle[AgentT]:
    """Create a new handle to an agent.

    A handle acts like a reference to a remote agent, enabling a user
    to manage the agent or asynchronously invoke actions.

    Args:
        aid: Agent to create an handle to. The agent must be registered
            with the same exchange.

    Returns:
        Handle to the agent.

    Raises:
        TypeError: if `aid` is not an instance of
            [`AgentId`][academy.identifier.AgentId].
    """
    if not isinstance(aid, AgentId):
        raise TypeError(
            f'Handle must be created from an {AgentId.__name__} '
            f'but got identifier with type {type(aid).__name__}.',
        )
    handle = RemoteHandle(self, aid)
    self._handles[handle.handle_id] = handle
    logger.info('Created handle to %s', aid)
    return handle

register_agent async

register_agent(
    agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]

Register a new agent and associated mailbox with the exchange.

Parameters:

  • agent (type[AgentT]) –

    Agent type of the agent.

  • name (str | None, default: None ) –

    Optional display name for the agent.

Returns:

Source code in academy/exchange/__init__.py
async def register_agent(
    self,
    agent: type[AgentT],
    *,
    name: str | None = None,
) -> AgentRegistration[AgentT]:
    """Register a new agent and associated mailbox with the exchange.

    Args:
        agent: Agent type of the agent.
        name: Optional display name for the agent.

    Returns:
        Agent registration info.
    """
    registration = await self._transport.register_agent(
        agent,
        name=name,
    )
    logger.info('Registered %s in exchange', registration.agent_id)
    return registration

send async

send(message: Message) -> None

Send a message to a mailbox.

Parameters:

  • message (Message) –

    Message to send.

Raises:

Source code in academy/exchange/__init__.py
async def send(self, message: Message) -> None:
    """Send a message to a mailbox.

    Args:
        message: Message to send.

    Raises:
        BadEntityIdError: If a mailbox for `message.dest` does not exist.
        MailboxTerminatedError: If the mailbox was closed.
    """
    await self._transport.send(message)
    logger.debug('Sent %s to %s', type(message).__name__, message.dest)

status async

status(uid: EntityId) -> MailboxStatus

Check the status of a mailbox in the exchange.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to check.

Source code in academy/exchange/__init__.py
async def status(self, uid: EntityId) -> MailboxStatus:
    """Check the status of a mailbox in the exchange.

    Args:
        uid: Entity identifier of the mailbox to check.
    """
    return await self._transport.status(uid)

terminate async

terminate(uid: EntityId) -> None

Terminate a mailbox in the exchange.

Terminating a mailbox means that the corresponding entity will no longer be able to receive messages.

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Source code in academy/exchange/__init__.py
async def terminate(self, uid: EntityId) -> None:
    """Terminate a mailbox in the exchange.

    Terminating a mailbox means that the corresponding entity will no
    longer be able to receive messages.

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.
    """
    await self._transport.terminate(uid)

UserExchangeClient

UserExchangeClient(
    user_id: UserId,
    transport: ExchangeTransportT,
    *,
    start_listener: bool = True
)

Bases: ExchangeClient[ExchangeTransportT]

User exchange client.

Warning

User exchange clients should only be created via ExchangeFactory.create_user_client()!

Parameters:

  • user_id (UserId) –

    User ID.

  • transport (ExchangeTransportT) –

    Exchange transport bound to user_id.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Source code in academy/exchange/__init__.py
def __init__(
    self,
    user_id: UserId,
    transport: ExchangeTransportT,
    *,
    start_listener: bool = True,
) -> None:
    super().__init__(transport)
    self._user_id = user_id
    self._listener_task: asyncio.Task[None] | None = None
    if start_listener:
        self._listener_task = asyncio.create_task(
            self._listen_for_messages(),
            name=f'user-exchange-listener-{self.client_id}',
        )

client_id property

client_id: UserId

User ID of the client.

close async

close() -> None

Close the user client.

This terminates the user's mailbox, closes the underlying exchange transport, and closes all handles produced by this client.

Source code in academy/exchange/__init__.py
async def close(self) -> None:
    """Close the user client.

    This terminates the user's mailbox, closes the underlying exchange
    transport, and closes all handles produced by this client.
    """
    async with self._close_lock:
        if self._closed:
            return

        await self._close_handles()
        await self._transport.terminate(self.client_id)
        logger.info(f'Terminated mailbox for {self.client_id}')
        if self._listener_task is not None:
            self._listener_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._listener_task
        await self._transport.close()
        self._closed = True
        logger.info('Closed exchange client for %s', self.client_id)

discover async

discover(
    agent: type[Agent], *, allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given agent.

Parameters:

  • agent (type[Agent]) –

    Agent type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the agent.

Returns:

Source code in academy/exchange/__init__.py
async def discover(
    self,
    agent: type[Agent],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given agent.

    Args:
        agent: Agent type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            agent.

    Returns:
        Tuple of agent IDs implementing the agent.
    """
    return await self._transport.discover(
        agent,
        allow_subclasses=allow_subclasses,
    )

factory

Get an exchange factory.

Source code in academy/exchange/__init__.py
def factory(self) -> ExchangeFactory[ExchangeTransportT]:
    """Get an exchange factory."""
    return self._transport.factory()

get_handle

get_handle(aid: AgentId[AgentT]) -> RemoteHandle[AgentT]

Create a new handle to an agent.

A handle acts like a reference to a remote agent, enabling a user to manage the agent or asynchronously invoke actions.

Parameters:

  • aid (AgentId[AgentT]) –

    Agent to create an handle to. The agent must be registered with the same exchange.

Returns:

Raises:

Source code in academy/exchange/__init__.py
def get_handle(self, aid: AgentId[AgentT]) -> RemoteHandle[AgentT]:
    """Create a new handle to an agent.

    A handle acts like a reference to a remote agent, enabling a user
    to manage the agent or asynchronously invoke actions.

    Args:
        aid: Agent to create an handle to. The agent must be registered
            with the same exchange.

    Returns:
        Handle to the agent.

    Raises:
        TypeError: if `aid` is not an instance of
            [`AgentId`][academy.identifier.AgentId].
    """
    if not isinstance(aid, AgentId):
        raise TypeError(
            f'Handle must be created from an {AgentId.__name__} '
            f'but got identifier with type {type(aid).__name__}.',
        )
    handle = RemoteHandle(self, aid)
    self._handles[handle.handle_id] = handle
    logger.info('Created handle to %s', aid)
    return handle

register_agent async

register_agent(
    agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]

Register a new agent and associated mailbox with the exchange.

Parameters:

  • agent (type[AgentT]) –

    Agent type of the agent.

  • name (str | None, default: None ) –

    Optional display name for the agent.

Returns:

Source code in academy/exchange/__init__.py
async def register_agent(
    self,
    agent: type[AgentT],
    *,
    name: str | None = None,
) -> AgentRegistration[AgentT]:
    """Register a new agent and associated mailbox with the exchange.

    Args:
        agent: Agent type of the agent.
        name: Optional display name for the agent.

    Returns:
        Agent registration info.
    """
    registration = await self._transport.register_agent(
        agent,
        name=name,
    )
    logger.info('Registered %s in exchange', registration.agent_id)
    return registration

send async

send(message: Message) -> None

Send a message to a mailbox.

Parameters:

  • message (Message) –

    Message to send.

Raises:

Source code in academy/exchange/__init__.py
async def send(self, message: Message) -> None:
    """Send a message to a mailbox.

    Args:
        message: Message to send.

    Raises:
        BadEntityIdError: If a mailbox for `message.dest` does not exist.
        MailboxTerminatedError: If the mailbox was closed.
    """
    await self._transport.send(message)
    logger.debug('Sent %s to %s', type(message).__name__, message.dest)

status async

status(uid: EntityId) -> MailboxStatus

Check the status of a mailbox in the exchange.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to check.

Source code in academy/exchange/__init__.py
async def status(self, uid: EntityId) -> MailboxStatus:
    """Check the status of a mailbox in the exchange.

    Args:
        uid: Entity identifier of the mailbox to check.
    """
    return await self._transport.status(uid)

terminate async

terminate(uid: EntityId) -> None

Terminate a mailbox in the exchange.

Terminating a mailbox means that the corresponding entity will no longer be able to receive messages.

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Source code in academy/exchange/__init__.py
async def terminate(self, uid: EntityId) -> None:
    """Terminate a mailbox in the exchange.

    Terminating a mailbox means that the corresponding entity will no
    longer be able to receive messages.

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.
    """
    await self._transport.terminate(uid)