Skip to content

academy.exchange.client

ExchangeClient

ExchangeClient(transport: ExchangeTransportT)

Bases: ABC, Generic[ExchangeTransportT]

Base exchange client.

Warning

Exchange clients should only be created via ExchangeFactory.create_agent_client() or ExchangeFactory.create_user_client()!

Parameters:

Source code in academy/exchange/client.py
def __init__(
    self,
    transport: ExchangeTransportT,
) -> None:
    self._transport = transport
    self._handles: WeakValueDictionary[uuid.UUID, Handle[Any]] = (
        WeakValueDictionary()
    )
    self._close_lock = asyncio.Lock()
    self._closed = False

client_id abstractmethod property

client_id: EntityId

Client ID as registered with the exchange.

close abstractmethod async

close() -> None

Close the transport.

Source code in academy/exchange/client.py
@abc.abstractmethod
async def close(self) -> None:
    """Close the transport."""
    ...

discover async

discover(
    agent: type[Agent], *, allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given agent.

Parameters:

  • agent (type[Agent]) –

    Agent type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the agent.

Returns:

Source code in academy/exchange/client.py
async def discover(
    self,
    agent: type[Agent],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given agent.

    Args:
        agent: Agent type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            agent.

    Returns:
        Tuple of agent IDs implementing the agent.
    """
    return await self._transport.discover(
        agent,
        allow_subclasses=allow_subclasses,
    )

factory

Get an exchange factory.

Source code in academy/exchange/client.py
def factory(self) -> ExchangeFactory[ExchangeTransportT]:
    """Get an exchange factory."""
    return self._transport.factory()

register_handle

register_handle(handle: Handle[AgentT]) -> None

Register an existing handle to receive messages.

Parameters:

Source code in academy/exchange/client.py
def register_handle(self, handle: Handle[AgentT]) -> None:
    """Register an existing handle to receive messages.

    Args:
        handle: Handle to register.
    """
    self._handles[handle.handle_id] = handle

register_agent async

register_agent(
    agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]

Register a new agent and associated mailbox with the exchange.

Parameters:

  • agent (type[AgentT]) –

    Agent type of the agent.

  • name (str | None, default: None ) –

    Optional display name for the agent.

Returns:

Source code in academy/exchange/client.py
async def register_agent(
    self,
    agent: type[AgentT],
    *,
    name: str | None = None,
) -> AgentRegistration[AgentT]:
    """Register a new agent and associated mailbox with the exchange.

    Args:
        agent: Agent type of the agent.
        name: Optional display name for the agent.

    Returns:
        Agent registration info.
    """
    registration = await self._transport.register_agent(
        agent,
        name=name,
    )
    logger.info('Registered %s in exchange', registration.agent_id)
    return registration

send async

send(message: Message[Any]) -> None

Send a message to a mailbox.

Parameters:

Raises:

  • BadEntityIdError

    If a mailbox for message.dest does not exist.

  • MailboxTerminatedError

    If the mailbox was closed.

Source code in academy/exchange/client.py
async def send(self, message: Message[Any]) -> None:
    """Send a message to a mailbox.

    Args:
        message: Message to send.

    Raises:
        BadEntityIdError: If a mailbox for `message.dest` does not exist.
        MailboxTerminatedError: If the mailbox was closed.
    """
    await self._transport.send(message)
    logger.debug('Sent %s to %s', type(message).__name__, message.dest)

status async

status(uid: EntityId) -> MailboxStatus

Check the status of a mailbox in the exchange.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to check.

Source code in academy/exchange/client.py
async def status(self, uid: EntityId) -> MailboxStatus:
    """Check the status of a mailbox in the exchange.

    Args:
        uid: Entity identifier of the mailbox to check.
    """
    return await self._transport.status(uid)

terminate async

terminate(uid: EntityId) -> None

Terminate a mailbox in the exchange.

Terminating a mailbox means that the corresponding entity will no longer be able to receive messages.

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Source code in academy/exchange/client.py
async def terminate(self, uid: EntityId) -> None:
    """Terminate a mailbox in the exchange.

    Terminating a mailbox means that the corresponding entity will no
    longer be able to receive messages.

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.
    """
    await self._transport.terminate(uid)

AgentExchangeClient

AgentExchangeClient(
    agent_id: AgentId[AgentT],
    transport: ExchangeTransportT,
    request_handler: RequestHandler[RequestT_co],
)

Bases: ExchangeClient[ExchangeTransportT], Generic[AgentT, ExchangeTransportT]

Agent exchange client.

Warning

Agent exchange clients should only be created via ExchangeFactory.create_agent_client()!

Parameters:

  • agent_id (AgentId[AgentT]) –

    Agent ID.

  • transport (ExchangeTransportT) –

    Exchange transport bound to agent_id.

  • request_handler (RequestHandler[RequestT_co]) –

    Request handler of the agent that will be called for each message received to this agent's mailbox. start_listener: Start a message listener thread.

Source code in academy/exchange/client.py
def __init__(
    self,
    agent_id: AgentId[AgentT],
    transport: ExchangeTransportT,
    request_handler: RequestHandler[RequestT_co],
) -> None:
    super().__init__(transport)
    self._agent_id = agent_id
    self._request_handler = request_handler

client_id property

client_id: AgentId[AgentT]

Agent ID of the client.

close async

close() -> None

Close the user client.

This closes the underlying exchange transport and all handles created by this client. The agent's mailbox will not be terminated so the agent can be started again later.

Source code in academy/exchange/client.py
async def close(self) -> None:
    """Close the user client.

    This closes the underlying exchange transport and all handles created
    by this client. The agent's mailbox will not be terminated so the agent
    can be started again later.
    """
    async with self._close_lock:
        if self._closed:
            return

        await self._transport.close()
        self._closed = True
        logger.info('Closed exchange client for %s', self.client_id)

discover async

discover(
    agent: type[Agent], *, allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given agent.

Parameters:

  • agent (type[Agent]) –

    Agent type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the agent.

Returns:

Source code in academy/exchange/client.py
async def discover(
    self,
    agent: type[Agent],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given agent.

    Args:
        agent: Agent type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            agent.

    Returns:
        Tuple of agent IDs implementing the agent.
    """
    return await self._transport.discover(
        agent,
        allow_subclasses=allow_subclasses,
    )

factory

Get an exchange factory.

Source code in academy/exchange/client.py
def factory(self) -> ExchangeFactory[ExchangeTransportT]:
    """Get an exchange factory."""
    return self._transport.factory()

register_handle

register_handle(handle: Handle[AgentT]) -> None

Register an existing handle to receive messages.

Parameters:

Source code in academy/exchange/client.py
def register_handle(self, handle: Handle[AgentT]) -> None:
    """Register an existing handle to receive messages.

    Args:
        handle: Handle to register.
    """
    self._handles[handle.handle_id] = handle

register_agent async

register_agent(
    agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]

Register a new agent and associated mailbox with the exchange.

Parameters:

  • agent (type[AgentT]) –

    Agent type of the agent.

  • name (str | None, default: None ) –

    Optional display name for the agent.

Returns:

Source code in academy/exchange/client.py
async def register_agent(
    self,
    agent: type[AgentT],
    *,
    name: str | None = None,
) -> AgentRegistration[AgentT]:
    """Register a new agent and associated mailbox with the exchange.

    Args:
        agent: Agent type of the agent.
        name: Optional display name for the agent.

    Returns:
        Agent registration info.
    """
    registration = await self._transport.register_agent(
        agent,
        name=name,
    )
    logger.info('Registered %s in exchange', registration.agent_id)
    return registration

send async

send(message: Message[Any]) -> None

Send a message to a mailbox.

Parameters:

Raises:

  • BadEntityIdError

    If a mailbox for message.dest does not exist.

  • MailboxTerminatedError

    If the mailbox was closed.

Source code in academy/exchange/client.py
async def send(self, message: Message[Any]) -> None:
    """Send a message to a mailbox.

    Args:
        message: Message to send.

    Raises:
        BadEntityIdError: If a mailbox for `message.dest` does not exist.
        MailboxTerminatedError: If the mailbox was closed.
    """
    await self._transport.send(message)
    logger.debug('Sent %s to %s', type(message).__name__, message.dest)

status async

status(uid: EntityId) -> MailboxStatus

Check the status of a mailbox in the exchange.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to check.

Source code in academy/exchange/client.py
async def status(self, uid: EntityId) -> MailboxStatus:
    """Check the status of a mailbox in the exchange.

    Args:
        uid: Entity identifier of the mailbox to check.
    """
    return await self._transport.status(uid)

terminate async

terminate(uid: EntityId) -> None

Terminate a mailbox in the exchange.

Terminating a mailbox means that the corresponding entity will no longer be able to receive messages.

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Source code in academy/exchange/client.py
async def terminate(self, uid: EntityId) -> None:
    """Terminate a mailbox in the exchange.

    Terminating a mailbox means that the corresponding entity will no
    longer be able to receive messages.

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.
    """
    await self._transport.terminate(uid)

UserExchangeClient

UserExchangeClient(
    user_id: UserId,
    transport: ExchangeTransportT,
    *,
    start_listener: bool = True
)

Bases: ExchangeClient[ExchangeTransportT]

User exchange client.

Warning

User exchange clients should only be created via ExchangeFactory.create_user_client()!

Parameters:

  • user_id (UserId) –

    User ID.

  • transport (ExchangeTransportT) –

    Exchange transport bound to user_id.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Source code in academy/exchange/client.py
def __init__(
    self,
    user_id: UserId,
    transport: ExchangeTransportT,
    *,
    start_listener: bool = True,
) -> None:
    super().__init__(transport)
    self._user_id = user_id
    self._listener_task: asyncio.Task[None] | None = None
    if start_listener:
        self._listener_task = asyncio.create_task(
            self._listen_for_messages(),
            name=f'user-exchange-listener-{self.client_id}',
        )

client_id property

client_id: UserId

User ID of the client.

close async

close() -> None

Close the user client.

This terminates the user's mailbox, closes the underlying exchange transport.

Source code in academy/exchange/client.py
async def close(self) -> None:
    """Close the user client.

    This terminates the user's mailbox, closes the underlying exchange
    transport.
    """
    async with self._close_lock:
        if self._closed:
            return

        await self._transport.terminate(self.client_id)
        logger.info(f'Terminated mailbox for {self.client_id}')
        await self._stop_listener_task()
        await self._transport.close()
        self._closed = True
        logger.info('Closed exchange client for %s', self.client_id)

discover async

discover(
    agent: type[Agent], *, allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given agent.

Parameters:

  • agent (type[Agent]) –

    Agent type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the agent.

Returns:

Source code in academy/exchange/client.py
async def discover(
    self,
    agent: type[Agent],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given agent.

    Args:
        agent: Agent type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            agent.

    Returns:
        Tuple of agent IDs implementing the agent.
    """
    return await self._transport.discover(
        agent,
        allow_subclasses=allow_subclasses,
    )

factory

Get an exchange factory.

Source code in academy/exchange/client.py
def factory(self) -> ExchangeFactory[ExchangeTransportT]:
    """Get an exchange factory."""
    return self._transport.factory()

register_handle

register_handle(handle: Handle[AgentT]) -> None

Register an existing handle to receive messages.

Parameters:

Source code in academy/exchange/client.py
def register_handle(self, handle: Handle[AgentT]) -> None:
    """Register an existing handle to receive messages.

    Args:
        handle: Handle to register.
    """
    self._handles[handle.handle_id] = handle

register_agent async

register_agent(
    agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]

Register a new agent and associated mailbox with the exchange.

Parameters:

  • agent (type[AgentT]) –

    Agent type of the agent.

  • name (str | None, default: None ) –

    Optional display name for the agent.

Returns:

Source code in academy/exchange/client.py
async def register_agent(
    self,
    agent: type[AgentT],
    *,
    name: str | None = None,
) -> AgentRegistration[AgentT]:
    """Register a new agent and associated mailbox with the exchange.

    Args:
        agent: Agent type of the agent.
        name: Optional display name for the agent.

    Returns:
        Agent registration info.
    """
    registration = await self._transport.register_agent(
        agent,
        name=name,
    )
    logger.info('Registered %s in exchange', registration.agent_id)
    return registration

send async

send(message: Message[Any]) -> None

Send a message to a mailbox.

Parameters:

Raises:

  • BadEntityIdError

    If a mailbox for message.dest does not exist.

  • MailboxTerminatedError

    If the mailbox was closed.

Source code in academy/exchange/client.py
async def send(self, message: Message[Any]) -> None:
    """Send a message to a mailbox.

    Args:
        message: Message to send.

    Raises:
        BadEntityIdError: If a mailbox for `message.dest` does not exist.
        MailboxTerminatedError: If the mailbox was closed.
    """
    await self._transport.send(message)
    logger.debug('Sent %s to %s', type(message).__name__, message.dest)

status async

status(uid: EntityId) -> MailboxStatus

Check the status of a mailbox in the exchange.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to check.

Source code in academy/exchange/client.py
async def status(self, uid: EntityId) -> MailboxStatus:
    """Check the status of a mailbox in the exchange.

    Args:
        uid: Entity identifier of the mailbox to check.
    """
    return await self._transport.status(uid)

terminate async

terminate(uid: EntityId) -> None

Terminate a mailbox in the exchange.

Terminating a mailbox means that the corresponding entity will no longer be able to receive messages.

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Source code in academy/exchange/client.py
async def terminate(self, uid: EntityId) -> None:
    """Terminate a mailbox in the exchange.

    Terminating a mailbox means that the corresponding entity will no
    longer be able to receive messages.

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.
    """
    await self._transport.terminate(uid)