Skip to content

academy.exchange.cloud.backend

MailboxBackend

Bases: Protocol

Backend protocol for storing mailboxes on server.

check_mailbox async

check_mailbox(client: str, uid: EntityId) -> MailboxStatus

Check if a mailbox exists, or is terminated.

Parameters:

  • client (str) –

    Client making the request.

  • uid (EntityId) –

    Mailbox id to check.

Returns:

Raises:

  • ForbiddenError

    If the client does not have the right permissions.

Source code in academy/exchange/cloud/backend.py
async def check_mailbox(
    self,
    client: str,
    uid: EntityId,
) -> MailboxStatus:
    """Check if a mailbox exists, or is terminated.

    Args:
        client: Client making the request.
        uid: Mailbox id to check.

    Returns:
        The mailbox status.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
    """
    ...

create_mailbox async

create_mailbox(
    client: str,
    uid: EntityId,
    agent: tuple[str, ...] | None = None,
) -> None

Create a mailbox is not exists.

This method should be idempotent.

Parameters:

  • client (str) –

    Client making the request.

  • uid (EntityId) –

    Mailbox id to check.

  • agent (tuple[str, ...] | None, default: None ) –

    The agent_mro for behavior discovery.

Raises:

  • ForbiddenError

    If the client does not have the right permissions.

Source code in academy/exchange/cloud/backend.py
async def create_mailbox(
    self,
    client: str,
    uid: EntityId,
    agent: tuple[str, ...] | None = None,
) -> None:
    """Create a mailbox is not exists.

    This method should be idempotent.

    Args:
        client: Client making the request.
        uid: Mailbox id to check.
        agent: The agent_mro for behavior discovery.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
    """

terminate async

terminate(client: str, uid: EntityId) -> None

Close a mailbox.

For security, the manager should keep a gravestone so the same id cannot be re-registered.

Parameters:

  • client (str) –

    Client making the request.

  • uid (EntityId) –

    Mailbox id to close.

Raises:

  • ForbiddenError

    If the client does not have the right permissions.

Source code in academy/exchange/cloud/backend.py
async def terminate(self, client: str, uid: EntityId) -> None:
    """Close a mailbox.

    For security, the manager should keep a gravestone so the same id
    cannot be re-registered.

    Args:
        client: Client making the request.
        uid: Mailbox id to close.

    Raises:
        ForbiddenError: If the client does not have the right permissions.

    """
    ...

discover async

discover(
    client: str, agent: str, allow_subclasses: bool
) -> list[AgentId[Any]]

Find mailboxes of matching agent class.

Parameters:

  • client (str) –

    Client making the request.

  • agent (str) –

    Agent class to search for.

  • allow_subclasses (bool) –

    Include agents that inherit from the target.

Raises:

  • ForbiddenError

    If the client does not have the right permissions.

Source code in academy/exchange/cloud/backend.py
async def discover(
    self,
    client: str,
    agent: str,
    allow_subclasses: bool,
) -> list[AgentId[Any]]:
    """Find mailboxes of matching agent class.

    Args:
        client: Client making the request.
        agent: Agent class to search for.
        allow_subclasses: Include agents that inherit from the target.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
    """
    ...

get async

get(
    client: str,
    uid: EntityId,
    *,
    timeout: float | None = None
) -> Message[Any]

Get messages from a mailbox.

Parameters:

  • client (str) –

    Client making the request.

  • uid (EntityId) –

    Mailbox id to get messages.

  • timeout (float | None, default: None ) –

    Time in seconds to wait for message. If None, wait indefinitely.

Raises:

Source code in academy/exchange/cloud/backend.py
async def get(
    self,
    client: str,
    uid: EntityId,
    *,
    timeout: float | None = None,
) -> Message[Any]:
    """Get messages from a mailbox.

    Args:
        client: Client making the request.
        uid: Mailbox id to get messages.
        timeout: Time in seconds to wait for message.
            If None, wait indefinitely.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
        BadEntityIdError: The mailbox requested does not exist.
        MailboxTerminatedError: The mailbox is closed.
        TimeoutError: There was not message received during the timeout.
    """
    ...

put async

put(client: str, message: Message[Any]) -> None

Put a message in a mailbox.

Parameters:

  • client (str) –

    Client making the request.

  • message (Message[Any]) –

    Message to put in mailbox.

Raises:

Source code in academy/exchange/cloud/backend.py
async def put(self, client: str, message: Message[Any]) -> None:
    """Put a message in a mailbox.

    Args:
        client: Client making the request.
        message: Message to put in mailbox.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
        BadEntityIdError: The mailbox requested does not exist.
        MailboxTerminatedError: The mailbox is closed.
        MessageTooLargeError: The message is larger than the message
            size limit for this exchange.
    """
    ...

PythonBackend

PythonBackend(message_size_limit_kb: int = 1024)

Mailbox backend using in-memory python data structures.

Parameters:

  • message_size_limit_kb (int, default: 1024 ) –

    Maximum message size to allow.

Source code in academy/exchange/cloud/backend.py
def __init__(
    self,
    message_size_limit_kb: int = 1024,
) -> None:
    self._owners: dict[EntityId, str | None] = {}
    self._mailboxes: dict[EntityId, AsyncQueue[Message[Any]]] = {}
    self._terminated: set[EntityId] = set()
    self._agents: dict[AgentId[Any], tuple[str, ...]] = {}
    self._locks: dict[EntityId, asyncio.Lock] = {}
    self.message_size_limit = message_size_limit_kb * KB_TO_BYTES

check_mailbox async

check_mailbox(client: str, uid: EntityId) -> MailboxStatus

Check if a mailbox exists, or is terminated.

Parameters:

  • client (str) –

    Client making the request.

  • uid (EntityId) –

    Mailbox id to check.

Returns:

Raises:

  • ForbiddenError

    If the client does not have the right permissions.

Source code in academy/exchange/cloud/backend.py
async def check_mailbox(
    self,
    client: str,
    uid: EntityId,
) -> MailboxStatus:
    """Check if a mailbox exists, or is terminated.

    Args:
        client: Client making the request.
        uid: Mailbox id to check.

    Returns:
        The mailbox status.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
    """
    if uid not in self._mailboxes:
        return MailboxStatus.MISSING
    elif not self._has_permissions(client, uid):
        raise ForbiddenError(
            'Client does not have correct permissions.',
        )

    async with self._locks[uid]:
        if uid in self._terminated:
            return MailboxStatus.TERMINATED
        else:
            return MailboxStatus.ACTIVE

create_mailbox async

create_mailbox(
    client: str,
    uid: EntityId,
    agent: tuple[str, ...] | None = None,
) -> None

Create a mailbox is not exists.

This method should be idempotent.

Parameters:

  • client (str) –

    Client making the request.

  • uid (EntityId) –

    Mailbox id to check.

  • agent (tuple[str, ...] | None, default: None ) –

    The agent_mro for behavior discovery.

Raises:

  • ForbiddenError

    If the client does not have the right permissions.

Source code in academy/exchange/cloud/backend.py
async def create_mailbox(
    self,
    client: str,
    uid: EntityId,
    agent: tuple[str, ...] | None = None,
) -> None:
    """Create a mailbox is not exists.

    This method should be idempotent.

    Args:
        client: Client making the request.
        uid: Mailbox id to check.
        agent: The agent_mro for behavior discovery.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
    """
    if not self._has_permissions(client, uid):
        raise ForbiddenError(
            'Client does not have correct permissions.',
        )

    mailbox = self._mailboxes.get(uid, None)
    if mailbox is None:
        if sys.version_info >= (3, 13):  # pragma: >=3.13 cover
            queue: AsyncQueue[Message[Any]] = Queue()
        else:  # pragma: <3.13 cover
            queue: AsyncQueue[Message[Any]] = Queue().async_q
        self._mailboxes[uid] = queue
        self._terminated.discard(uid)
        self._owners[uid] = client
        self._locks[uid] = asyncio.Lock()
        if agent is not None and isinstance(uid, AgentId):
            self._agents[uid] = agent
        logger.info('Created mailbox for %s', uid)

terminate async

terminate(client: str, uid: EntityId) -> None

Close a mailbox.

For security, the manager should keep a gravestone so the same id cannot be re-registered.

Parameters:

  • client (str) –

    Client making the request.

  • uid (EntityId) –

    Mailbox id to close.

Raises:

  • ForbiddenError

    If the client does not have the right permissions.

Source code in academy/exchange/cloud/backend.py
async def terminate(self, client: str, uid: EntityId) -> None:
    """Close a mailbox.

    For security, the manager should keep a gravestone so the same id
    cannot be re-registered.

    Args:
        client: Client making the request.
        uid: Mailbox id to close.

    Raises:
        ForbiddenError: If the client does not have the right permissions.

    """
    if not self._has_permissions(client, uid):
        raise ForbiddenError(
            'Client does not have correct permissions.',
        )

    self._terminated.add(uid)
    mailbox = self._mailboxes.get(uid, None)
    if mailbox is None:
        return

    async with self._locks[uid]:
        messages = await _drain_queue(mailbox)
        for message in messages:
            if message.is_request():
                error = MailboxTerminatedError(uid)
                body = ErrorResponse(exception=error)
                response = message.create_response(body)
                with contextlib.suppress(Exception):
                    await self.put(client, response)

        mailbox.shutdown(immediate=True)
        logger.info('Closed mailbox for %s', uid)

discover async

discover(
    client: str, agent: str, allow_subclasses: bool
) -> list[AgentId[Any]]

Find mailboxes of matching agent class.

Parameters:

  • client (str) –

    Client making the request.

  • agent (str) –

    Agent class to search for.

  • allow_subclasses (bool) –

    Include agents that inherit from the target.

Raises:

  • ForbiddenError

    If the client does not have the right permissions.

Source code in academy/exchange/cloud/backend.py
async def discover(
    self,
    client: str,
    agent: str,
    allow_subclasses: bool,
) -> list[AgentId[Any]]:
    """Find mailboxes of matching agent class.

    Args:
        client: Client making the request.
        agent: Agent class to search for.
        allow_subclasses: Include agents that inherit from the target.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
    """
    found: list[AgentId[Any]] = []
    for aid, agents in self._agents.items():
        if not self._has_permissions(client, aid):
            continue
        if aid in self._terminated:
            continue
        if agent == agents[0] or (allow_subclasses and agent in agents):
            found.append(aid)
    return found

get async

get(
    client: str,
    uid: EntityId,
    *,
    timeout: float | None = None
) -> Message[Any]

Get messages from a mailbox.

Parameters:

  • client (str) –

    Client making the request.

  • uid (EntityId) –

    Mailbox id to get messages.

  • timeout (float | None, default: None ) –

    Time in seconds to wait for message. If None, wait indefinitely.

Raises:

Source code in academy/exchange/cloud/backend.py
async def get(
    self,
    client: str,
    uid: EntityId,
    *,
    timeout: float | None = None,
) -> Message[Any]:
    """Get messages from a mailbox.

    Args:
        client: Client making the request.
        uid: Mailbox id to get messages.
        timeout: Time in seconds to wait for message.
            If None, wait indefinitely.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
        BadEntityIdError: The mailbox requested does not exist.
        MailboxTerminatedError: The mailbox is closed.
        TimeoutError: There was not message received during the timeout.
    """
    if not self._has_permissions(client, uid):
        raise ForbiddenError(
            'Client does not have correct permissions.',
        )

    try:
        queue = self._mailboxes[uid]
    except KeyError as e:
        raise BadEntityIdError(uid) from e
    try:
        return await asyncio.wait_for(queue.get(), timeout=timeout)
    except QueueShutDown:
        raise MailboxTerminatedError(uid) from None
    except asyncio.TimeoutError:
        # In Python 3.10 and older, asyncio.TimeoutError and TimeoutError
        # are different error types.
        raise TimeoutError(
            f'No message retrieved within {timeout} seconds.',
        ) from None

put async

put(client: str, message: Message[Any]) -> None

Put a message in a mailbox.

Parameters:

  • client (str) –

    Client making the request.

  • message (Message[Any]) –

    Message to put in mailbox.

Raises:

Source code in academy/exchange/cloud/backend.py
async def put(self, client: str, message: Message[Any]) -> None:
    """Put a message in a mailbox.

    Args:
        client: Client making the request.
        message: Message to put in mailbox.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
        BadEntityIdError: The mailbox requested does not exist.
        MailboxTerminatedError: The mailbox is closed.
        MessageTooLargeError: The message is larger than the message
            size limit for this exchange.
    """
    if not self._has_permissions(client, message.dest):
        raise ForbiddenError(
            'Client does not have correct permissions.',
        )

    if sys.getsizeof(message.body) > self.message_size_limit:
        raise MessageTooLargeError(
            sys.getsizeof(message.body),
            self.message_size_limit,
        )

    try:
        queue = self._mailboxes[message.dest]
    except KeyError as e:
        raise BadEntityIdError(message.dest) from e

    async with self._locks[message.dest]:
        try:
            await queue.put(message)
        except QueueShutDown:
            raise MailboxTerminatedError(message.dest) from None

RedisBackend

RedisBackend(
    hostname: str = "localhost",
    port: int = 6379,
    *,
    message_size_limit_kb: int = 1024,
    kwargs: dict[str, Any] | None = None,
    mailbox_expiration_s: int | None = None,
    gravestone_expiration_s: int | None = None
)

Redis backend of mailboxes.

Parameters:

  • hostname (str, default: 'localhost' ) –

    Host address of redis.

  • port (int, default: 6379 ) –

    Redis port.

  • message_size_limit_kb (int, default: 1024 ) –

    Maximum message size to allow.

  • kwargs (dict[str, Any] | None, default: None ) –

    Addition arguments to pass to redis session.

Source code in academy/exchange/cloud/backend.py
def __init__(  # noqa: PLR0913
    self,
    hostname: str = 'localhost',
    port: int = 6379,
    *,
    message_size_limit_kb: int = 1024,
    kwargs: dict[str, Any] | None = None,
    mailbox_expiration_s: int | None = None,
    gravestone_expiration_s: int | None = None,
) -> None:
    self.message_size_limit = message_size_limit_kb * KB_TO_BYTES

    if kwargs is None:  # pragma: no branch
        kwargs = {}

    self._client = redis.asyncio.Redis(
        host=hostname,
        port=port,
        decode_responses=False,
        **kwargs,
    )
    self.mailbox_expiration_s = mailbox_expiration_s
    self.gravestone_expiration_s = gravestone_expiration_s

check_mailbox async

check_mailbox(client: str, uid: EntityId) -> MailboxStatus

Check if a mailbox exists, or is terminated.

Parameters:

  • client (str) –

    Client making the request.

  • uid (EntityId) –

    Mailbox id to check.

Returns:

Raises:

  • ForbiddenError

    If the client does not have the right permissions.

Source code in academy/exchange/cloud/backend.py
async def check_mailbox(
    self,
    client: str,
    uid: EntityId,
) -> MailboxStatus:
    """Check if a mailbox exists, or is terminated.

    Args:
        client: Client making the request.
        uid: Mailbox id to check.

    Returns:
        The mailbox status.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
    """
    if not await self._has_permissions(client, uid):
        raise ForbiddenError(
            'Client does not have correct permissions.',
        )

    status = await self._client.get(self._active_key(uid))
    if status is None:
        return MailboxStatus.MISSING
    elif status == MailboxStatus.TERMINATED.value:
        return MailboxStatus.TERMINATED
    else:
        return MailboxStatus.ACTIVE

create_mailbox async

create_mailbox(
    client: str,
    uid: EntityId,
    agent: tuple[str, ...] | None = None,
) -> None

Create a mailbox is not exists.

This method should be idempotent.

Parameters:

  • client (str) –

    Client making the request.

  • uid (EntityId) –

    Mailbox id to check.

  • agent (tuple[str, ...] | None, default: None ) –

    The agent_mro for behavior discovery.

Raises:

  • ForbiddenError

    If the client does not have the right permissions.

Source code in academy/exchange/cloud/backend.py
async def create_mailbox(
    self,
    client: str,
    uid: EntityId,
    agent: tuple[str, ...] | None = None,
) -> None:
    """Create a mailbox is not exists.

    This method should be idempotent.

    Args:
        client: Client making the request.
        uid: Mailbox id to check.
        agent: The agent_mro for behavior discovery.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
    """
    if not await self._has_permissions(client, uid):
        raise ForbiddenError(
            'Client does not have correct permissions.',
        )

    await self._client.set(
        self._active_key(uid),
        MailboxStatus.ACTIVE.value,
    )

    if agent is not None:
        await self._client.set(
            self._agent_key(uid),
            ','.join(agent),
        )

    await self._client.set(
        self._owner_key(uid),
        f'{client}{_OWNER_SUFFIX}',
    )
    await self._update_expirations(uid)

terminate async

terminate(client: str, uid: EntityId) -> None

Close a mailbox.

For security, the manager should keep a gravestone so the same id cannot be re-registered.

Parameters:

  • client (str) –

    Client making the request.

  • uid (EntityId) –

    Mailbox id to close.

Raises:

  • ForbiddenError

    If the client does not have the right permissions.

Source code in academy/exchange/cloud/backend.py
async def terminate(self, client: str, uid: EntityId) -> None:
    """Close a mailbox.

    For security, the manager should keep a gravestone so the same id
    cannot be re-registered.

    Args:
        client: Client making the request.
        uid: Mailbox id to close.

    Raises:
        ForbiddenError: If the client does not have the right permissions.

    """
    if not await self._has_permissions(client, uid):
        raise ForbiddenError(
            'Client does not have correct permissions.',
        )

    status = await self.check_mailbox(client, uid)

    if status in {MailboxStatus.MISSING, MailboxStatus.TERMINATED}:
        return

    await self._client.set(
        self._active_key(uid),
        MailboxStatus.TERMINATED.value,
    )

    pending = await self._client.lrange(self._queue_key(uid), 0, -1)  # type: ignore[misc]
    if self.gravestone_expiration_s is not None:
        await self._client.expire(
            self._active_key(uid),
            self.gravestone_expiration_s,
        )

    await self._client.delete(self._queue_key(uid))
    # Sending a close sentinel to the queue is a quick way to force
    # the entity waiting on messages to the mailbox to stop blocking.
    # This assumes that only one entity is reading from the mailbox.
    await self._client.rpush(self._queue_key(uid), _CLOSE_SENTINEL)  # type: ignore[misc]
    if isinstance(uid, AgentId):
        await self._client.delete(self._agent_key(uid))

    for raw in pending:
        message: Message[Any] = Message.model_deserialize(raw)
        if message.is_request():
            error = MailboxTerminatedError(uid)
            body = ErrorResponse(exception=error)
            response = message.create_response(body)
            with contextlib.suppress(Exception):
                await self.put(client, response)

discover async

discover(
    client: str, agent: str, allow_subclasses: bool
) -> list[AgentId[Any]]

Find mailboxes of matching agent class.

Parameters:

  • client (str) –

    Client making the request.

  • agent (str) –

    Agent class to search for.

  • allow_subclasses (bool) –

    Include agents that inherit from the target.

Source code in academy/exchange/cloud/backend.py
async def discover(
    self,
    client: str,
    agent: str,
    allow_subclasses: bool,
) -> list[AgentId[Any]]:
    """Find mailboxes of matching agent class.

    Args:
        client: Client making the request.
        agent: Agent class to search for.
        allow_subclasses: Include agents that inherit from the target.
    """
    found: list[AgentId[Any]] = []
    async for key in self._client.scan_iter(
        'agent:*',
    ):  # pragma: no branch
        mro_str = await self._client.get(key)
        assert isinstance(mro_str, str)
        mro = mro_str.split(',')
        if agent == mro[0] or (allow_subclasses and agent in mro):
            aid: AgentId[Any] = AgentId(uid=uuid.UUID(key.split(':')[-1]))
            found.append(aid)

    active: list[AgentId[Any]] = []
    for aid in found:
        if await self._has_permissions(client, aid):
            status = await self._client.get(self._active_key(aid))
            if status == MailboxStatus.ACTIVE.value:  # pragma: no branch
                active.append(aid)

    return active

get async

get(
    client: str,
    uid: EntityId,
    *,
    timeout: float | None = None
) -> Message[Any]

Get messages from a mailbox.

Parameters:

  • client (str) –

    Client making the request.

  • uid (EntityId) –

    Mailbox id to get messages.

  • timeout (float | None, default: None ) –

    Time in seconds to wait for message. If None, wait indefinitely.

Raises:

Source code in academy/exchange/cloud/backend.py
async def get(
    self,
    client: str,
    uid: EntityId,
    *,
    timeout: float | None = None,
) -> Message[Any]:
    """Get messages from a mailbox.

    Args:
        client: Client making the request.
        uid: Mailbox id to get messages.
        timeout: Time in seconds to wait for message.
            If None, wait indefinitely.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
        BadEntityIdError: The mailbox requested does not exist.
        MailboxTerminatedError: The mailbox is closed.
        TimeoutError: There was not message received during the timeout.
    """
    if not await self._has_permissions(client, uid):
        raise ForbiddenError(
            'Client does not have correct permissions.',
        )

    _timeout = timeout if timeout is not None else 0
    status = await self._client.get(
        self._active_key(uid),
    )
    if status is None:
        raise BadEntityIdError(uid)
    elif status == MailboxStatus.TERMINATED.value:
        raise MailboxTerminatedError(uid)

    await self._update_expirations(uid)
    if self.mailbox_expiration_s:
        await self._client.expire(
            self._queue_key(uid),
            self.mailbox_expiration_s,
        )

    raw = await self._client.blpop(  # type: ignore[misc]
        [self._queue_key(uid)],
        timeout=_timeout,
    )
    if raw is None:
        raise TimeoutError(
            f'Timeout waiting for next message for {uid} '
            f'after {timeout} seconds.',
        )

    # Only passed one key to blpop to result is [key, item]
    assert len(raw) == 2  # noqa: PLR2004
    if raw[1] == _CLOSE_SENTINEL:  # pragma: no cover
        raise MailboxTerminatedError(uid)
    return Message.model_deserialize(raw[1])

put async

put(client: str, message: Message[Any]) -> None

Put a message in a mailbox.

Parameters:

  • client (str) –

    Client making the request.

  • message (Message[Any]) –

    Message to put in mailbox.

Raises:

Source code in academy/exchange/cloud/backend.py
async def put(self, client: str, message: Message[Any]) -> None:
    """Put a message in a mailbox.

    Args:
        client: Client making the request.
        message: Message to put in mailbox.

    Raises:
        ForbiddenError: If the client does not have the right permissions.
        BadEntityIdError: The mailbox requested does not exist.
        MailboxTerminatedError: The mailbox is closed.
        MessageTooLargeError: The message is larger than the message
            size limit for this exchange.
    """
    if not await self._has_permissions(client, message.dest):
        raise ForbiddenError(
            'Client does not have correct permissions.',
        )

    status = await self._client.get(self._active_key(message.dest))
    if status is None:
        raise BadEntityIdError(message.dest)
    elif status == MailboxStatus.TERMINATED.value:
        raise MailboxTerminatedError(message.dest)

    serialized = message.model_serialize()
    if len(serialized) > self.message_size_limit:
        raise MessageTooLargeError(
            len(serialized),
            self.message_size_limit,
        )

    await self._client.rpush(  # type: ignore[misc]
        self._queue_key(message.dest),
        serialized,
    )

    if self.mailbox_expiration_s:
        await self._client.expire(
            self._queue_key(message.dest),
            self.mailbox_expiration_s,
            nx=True,
        )