Skip to content

academy.exchange.transport

AgentRegistrationT module-attribute

AgentRegistrationT = TypeVar(
    "AgentRegistrationT", bound=AgentRegistration[Any]
)

Type variable bound AgentRegistration.

ExchangeTransportT module-attribute

ExchangeTransportT = TypeVar(
    "ExchangeTransportT", bound=ExchangeTransport[Any]
)

Type variable bound ExchangeTransport.

MailboxStatus

Bases: Enum

Exchange mailbox status.

MISSING class-attribute instance-attribute

MISSING = 'MISSING'

Mailbox does not exist.

ACTIVE class-attribute instance-attribute

ACTIVE = 'ACTIVE'

Mailbox exists and is accepting messages.

TERMINATED class-attribute instance-attribute

TERMINATED = 'TERMINATED'

Mailbox was terminated and no longer accepts messages.

AgentRegistration

Bases: Protocol[AgentT]

Agent exchange registration information.

Attributes:

  • agent_id (AgentId[AgentT]) –

    Unique agent identifier returned by the exchange.

ExchangeTransport

Bases: Protocol[AgentRegistrationT_co]

Low-level exchange communicator.

A message exchange hosts mailboxes for each entity (i.e., agent or user) in a multi-agent system. This transport protocol defines mechanisms for entity management (e.g., registration, discovery, status, termination) and for sending/receiving messages from a mailbox. As such, each transport instance is "bound" to a specific mailbox in the exchange.

Warning

A specific exchange transport should not be replicated because multiple client instances receiving from the same mailbox produces undefined agent.

mailbox_id property

mailbox_id: EntityId

ID of the mailbox this client is bound to.

close async

close() -> None

Close the exchange client.

Note

This does not alter the state of the mailbox this client is bound to. I.e., the mailbox will not be terminated.

Source code in academy/exchange/transport.py
async def close(self) -> None:
    """Close the exchange client.

    Note:
        This does not alter the state of the mailbox this client is bound
        to. I.e., the mailbox will not be terminated.
    """
    ...

discover async

discover(
    agent: type[Agent], *, allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given agent.

Warning

Implementations of this method are often O(n) and scan the types of all agents registered to the exchange.

Parameters:

  • agent (type[Agent]) –

    Agent type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the agent.

Returns:

Raises:

  • ExchangeError

    Error returned by the exchange.

Source code in academy/exchange/transport.py
async def discover(
    self,
    agent: type[Agent],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given agent.

    Warning:
        Implementations of this method are often O(n) and scan the types
        of all agents registered to the exchange.

    Args:
        agent: Agent type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            agent.

    Returns:
        Tuple of agent IDs implementing the agent.

    Raises:
        ExchangeError: Error returned by the exchange.
    """
    ...

factory

factory() -> ExchangeFactory[Self]

Get an exchange factory.

Source code in academy/exchange/transport.py
def factory(self) -> ExchangeFactory[Self]:
    """Get an exchange factory."""
    ...

recv async

recv(timeout: float | None = None) -> Message

Receive the next message sent to the mailbox.

This blocks until the next message is received, there is a timeout, or the mailbox is terminated.

Parameters:

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for the next message. If None, the default, block forever until the next message or the mailbox is closed.

Raises:

Source code in academy/exchange/transport.py
async def recv(self, timeout: float | None = None) -> Message:
    """Receive the next message sent to the mailbox.

    This blocks until the next message is received, there is a timeout, or
    the mailbox is terminated.

    Args:
        timeout: Optional timeout in seconds to wait for the next
            message. If `None`, the default, block forever until the
            next message or the mailbox is closed.

    Raises:
        MailboxTerminatedError: If the mailbox was closed.
        ExchangeError: Error returned by the exchange.
        TimeoutError: If a `timeout` was specified and exceeded.
    """
    ...

register_agent async

register_agent(
    agent: type[AgentT], *, name: str | None = None
) -> AgentRegistrationT_co

Register a new agent and associated mailbox with the exchange.

Parameters:

  • agent (type[AgentT]) –

    Agent type of the agent.

  • name (str | None, default: None ) –

    Optional display name for the agent.

Returns:

  • AgentRegistrationT_co

    Agent registration info.

Raises:

  • ExchangeError

    Error returned by the exchange.

Source code in academy/exchange/transport.py
async def register_agent(
    self,
    agent: type[AgentT],
    *,
    name: str | None = None,
) -> AgentRegistrationT_co:
    """Register a new agent and associated mailbox with the exchange.

    Args:
        agent: Agent type of the agent.
        name: Optional display name for the agent.

    Returns:
        Agent registration info.

    Raises:
        ExchangeError: Error returned by the exchange.
    """
    ...

send async

send(message: Message) -> None

Send a message to a mailbox.

Parameters:

  • message (Message) –

    Message to send.

Raises:

Source code in academy/exchange/transport.py
async def send(self, message: Message) -> None:
    """Send a message to a mailbox.

    Args:
        message: Message to send.

    Raises:
        BadEntityIdError: If a mailbox for `message.dest` does not exist.
        MailboxTerminatedError: If the mailbox was closed.
        ExchangeError: Error returned by the exchange.
    """
    ...

status async

status(uid: EntityId) -> MailboxStatus

Check the status of a mailbox in the exchange.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to check.

Raises:

  • ExchangeError

    Error returned by the exchange.

Source code in academy/exchange/transport.py
async def status(self, uid: EntityId) -> MailboxStatus:
    """Check the status of a mailbox in the exchange.

    Args:
        uid: Entity identifier of the mailbox to check.

    Raises:
        ExchangeError: Error returned by the exchange.
    """
    ...

terminate async

terminate(uid: EntityId) -> None

Terminate a mailbox in the exchange.

Once an entity's mailbox is terminated:

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Raises:

  • ExchangeError

    Error returned by the exchange.

Source code in academy/exchange/transport.py
async def terminate(self, uid: EntityId) -> None:
    """Terminate a mailbox in the exchange.

    Once an entity's mailbox is terminated:

    * All request messages in the mailbox will be replied to with a
      [`MailboxTerminatedError`][academy.exception.MailboxTerminatedError].
    * All calls to
      [`recv()`][academy.exchange.transport.ExchangeTransport.recv]
      will raise a
      [`MailboxTerminatedError`][academy.exception.MailboxTerminatedError].
    * All attempts to
      [`send()`][academy.exchange.transport.ExchangeTransport.send]
      to this mailbox by other entities will raise a
      [`MailboxTerminatedError`][academy.exception.MailboxTerminatedError].

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.

    Raises:
        ExchangeError: Error returned by the exchange.
    """
    ...

ExchangeTransportMixin

Magic method mixin for exchange transport implementations.

Adds __repr__, __str__, and context manager support.