Skip to content

academy.exchange.redis

RedisExchange

RedisExchange(
    hostname: str,
    port: int,
    *,
    timeout: int | None = None,
    **kwargs: Any
)

Bases: ExchangeMixin

Redis-hosted message exchange interface.

Parameters:

  • hostname (str) –

    Redis server hostname.

  • port (int) –

    Redis server port.

  • kwargs (Any, default: {} ) –

    Extra keyword arguments to pass to redis.Redis().

  • timeout (int | None, default: None ) –

    Timeout for waiting on the next message. If None, the timeout will be set to one second but will loop indefinitely.

Raises:

Source code in academy/exchange/redis.py
def __init__(
    self,
    hostname: str,
    port: int,
    *,
    timeout: int | None = None,
    **kwargs: Any,
) -> None:
    self.hostname = hostname
    self.port = port
    self.timeout = timeout
    self._kwargs = kwargs
    self._client = redis.Redis(
        host=hostname,
        port=port,
        decode_responses=False,
        **kwargs,
    )
    self._client.ping()

close

close() -> None

Close the exchange interface.

Source code in academy/exchange/redis.py
def close(self) -> None:
    """Close the exchange interface."""
    self._client.close()
    logger.debug('Closed exchange (%s)', self)

register_agent

register_agent(
    behavior: type[BehaviorT],
    *,
    agent_id: AgentId[BehaviorT] | None = None,
    name: str | None = None
) -> AgentId[BehaviorT]

Create a new agent identifier and associated mailbox.

Parameters:

  • behavior (type[BehaviorT]) –

    Type of the behavior this agent will implement.

  • agent_id (AgentId[BehaviorT] | None, default: None ) –

    Specify the ID of the agent. Randomly generated default.

  • name (str | None, default: None ) –

    Optional human-readable name for the agent. Ignored if agent_id is provided.

Returns:

  • AgentId[BehaviorT]

    Unique identifier for the agent's mailbox.

Source code in academy/exchange/redis.py
def register_agent(
    self,
    behavior: type[BehaviorT],
    *,
    agent_id: AgentId[BehaviorT] | None = None,
    name: str | None = None,
) -> AgentId[BehaviorT]:
    """Create a new agent identifier and associated mailbox.

    Args:
        behavior: Type of the behavior this agent will implement.
        agent_id: Specify the ID of the agent. Randomly generated
            default.
        name: Optional human-readable name for the agent. Ignored if
            `agent_id` is provided.

    Returns:
        Unique identifier for the agent's mailbox.
    """
    aid = AgentId.new(name=name) if agent_id is None else agent_id
    self._client.set(self._active_key(aid), _MailboxState.ACTIVE.value)
    self._client.set(
        self._behavior_key(aid),
        ','.join(behavior.behavior_mro()),
    )
    logger.debug('Registered %s in %s', aid, self)
    return aid

register_client

register_client(*, name: str | None = None) -> ClientId

Create a new client identifier and associated mailbox.

Parameters:

  • name (str | None, default: None ) –

    Optional human-readable name for the client.

Returns:

  • ClientId

    Unique identifier for the client's mailbox.

Source code in academy/exchange/redis.py
def register_client(
    self,
    *,
    name: str | None = None,
) -> ClientId:
    """Create a new client identifier and associated mailbox.

    Args:
        name: Optional human-readable name for the client.

    Returns:
        Unique identifier for the client's mailbox.
    """
    cid = ClientId.new(name=name)
    self._client.set(self._active_key(cid), _MailboxState.ACTIVE.value)
    logger.debug('Registered %s in %s', cid, self)
    return cid

terminate

terminate(uid: EntityId) -> None

Close the mailbox for an entity from the exchange.

Note

This method is a no-op if the mailbox does not exist.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Source code in academy/exchange/redis.py
def terminate(self, uid: EntityId) -> None:
    """Close the mailbox for an entity from the exchange.

    Note:
        This method is a no-op if the mailbox does not exist.

    Args:
        uid: Entity identifier of the mailbox to close.
    """
    self._client.set(self._active_key(uid), _MailboxState.INACTIVE.value)
    # Sending a close sentinel to the queue is a quick way to force
    # the entity waiting on messages to the mailbox to stop blocking.
    # This assumes that only one entity is reading from the mailbox.
    self._client.rpush(self._queue_key(uid), _CLOSE_SENTINEL)
    if isinstance(uid, AgentId):
        self._client.delete(self._behavior_key(uid))
    logger.debug('Closed mailbox for %s (%s)', uid, self)

discover

discover(
    behavior: type[Behavior], allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given behavior.

Warning

This method is O(n) and scans all keys in the Redis server.

Parameters:

  • behavior (type[Behavior]) –

    Behavior type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the behavior.

Returns:

  • tuple[AgentId[Any], ...]

    Tuple of agent IDs implementing the behavior.

Source code in academy/exchange/redis.py
def discover(
    self,
    behavior: type[Behavior],
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given behavior.

    Warning:
        This method is O(n) and scans all keys in the Redis server.

    Args:
        behavior: Behavior type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            behavior.

    Returns:
        Tuple of agent IDs implementing the behavior.
    """
    found: list[AgentId[Any]] = []
    fqp = f'{behavior.__module__}.{behavior.__name__}'
    for key in self._client.scan_iter('behavior:*'):
        mro_str = self._client.get(key)
        assert isinstance(mro_str, str)
        mro = mro_str.split(',')
        if fqp == mro[0] or (allow_subclasses and fqp in mro):
            aid: AgentId[Any] = AgentId(uid=uuid.UUID(key.split(':')[-1]))
            found.append(aid)
    active: list[AgentId[Any]] = []
    for aid in found:
        status = self._client.get(self._active_key(aid))
        if status == _MailboxState.ACTIVE.value:  # pragma: no branch
            active.append(aid)
    return tuple(active)

get_mailbox

get_mailbox(uid: EntityId) -> RedisMailbox

Get a client to a specific mailbox.

Parameters:

  • uid (EntityId) –

    EntityId of the mailbox.

Returns:

Raises:

Source code in academy/exchange/redis.py
def get_mailbox(self, uid: EntityId) -> RedisMailbox:
    """Get a client to a specific mailbox.

    Args:
        uid: EntityId of the mailbox.

    Returns:
        Mailbox client.

    Raises:
        BadEntityIdError: if a mailbox for `uid` does not exist.
    """
    return RedisMailbox(uid, self)

send

send(uid: EntityId, message: Message) -> None

Send a message to a mailbox.

Parameters:

  • uid (EntityId) –

    Destination address of the message.

  • message (Message) –

    Message to send.

Raises:

Source code in academy/exchange/redis.py
def send(self, uid: EntityId, message: Message) -> None:
    """Send a message to a mailbox.

    Args:
        uid: Destination address of the message.
        message: Message to send.

    Raises:
        BadEntityIdError: if a mailbox for `uid` does not exist.
        MailboxClosedError: if the mailbox was closed.
    """
    status = self._client.get(self._active_key(uid))
    if status is None:
        raise BadEntityIdError(uid)
    elif status == _MailboxState.INACTIVE.value:
        raise MailboxClosedError(uid)
    else:
        self._client.rpush(self._queue_key(uid), message.model_serialize())
        logger.debug('Sent %s to %s', type(message).__name__, uid)

get_handle

get_handle(
    aid: AgentId[BehaviorT],
) -> UnboundRemoteHandle[BehaviorT]

Create a new handle to an agent.

A handle enables a client to invoke actions on the agent.

Note

It is not possible to create a handle to a client since a handle is essentially a new client of a specific agent.

Parameters:

  • aid (AgentId[BehaviorT]) –

    EntityId of the agent to create an handle to.

Returns:

Raises:

Source code in academy/exchange/__init__.py
def get_handle(
    self: Exchange,
    aid: AgentId[BehaviorT],
) -> UnboundRemoteHandle[BehaviorT]:
    """Create a new handle to an agent.

    A handle enables a client to invoke actions on the agent.

    Note:
        It is not possible to create a handle to a client since a handle
        is essentially a new client of a specific agent.

    Args:
        aid: EntityId of the agent to create an handle to.

    Returns:
        Handle to the agent.

    Raises:
        TypeError: if `aid` is not an instance of
            [`AgentId`][academy.identifier.AgentId].
    """
    if not isinstance(aid, AgentId):
        raise TypeError(
            f'Handle must be created from an {AgentId.__name__} '
            f'but got identifier with type {type(aid).__name__}.',
        )
    return UnboundRemoteHandle(self, aid)

RedisMailbox

RedisMailbox(uid: EntityId, exchange: RedisExchange)

Bases: NoPickleMixin

Client protocol that listens to incoming messages to a mailbox.

Parameters:

Raises:

Source code in academy/exchange/redis.py
def __init__(self, uid: EntityId, exchange: RedisExchange) -> None:
    self._uid = uid
    self._exchange = exchange

    status = self.exchange._client.get(self.exchange._active_key(uid))
    if status is None:
        raise BadEntityIdError(uid)

exchange property

exchange: RedisExchange

Exchange client.

mailbox_id property

mailbox_id: EntityId

Mailbox address/identifier.

close

close() -> None

Close this mailbox client.

Warning

This does not close the mailbox in the exchange. I.e., the exchange will still accept new messages to this mailbox, but this client will no longer be listening for them.

Source code in academy/exchange/redis.py
def close(self) -> None:
    """Close this mailbox client.

    Warning:
        This does not close the mailbox in the exchange. I.e., the exchange
        will still accept new messages to this mailbox, but this client
        will no longer be listening for them.
    """
    pass

recv

recv(timeout: float | None = None) -> Message

Receive the next message in the mailbox.

This blocks until the next message is received or the mailbox is closed.

Parameters:

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for the next message. If None, the default, block forever until the next message or the mailbox is closed. Note that this will be cast to an int which is required by the Redis API.

Raises:

Source code in academy/exchange/redis.py
def recv(self, timeout: float | None = None) -> Message:
    """Receive the next message in the mailbox.

    This blocks until the next message is received or the mailbox
    is closed.

    Args:
        timeout: Optional timeout in seconds to wait for the next
            message. If `None`, the default, block forever until the
            next message or the mailbox is closed. Note that this will
            be cast to an int which is required by the Redis API.

    Raises:
        MailboxClosedError: if the mailbox was closed.
        TimeoutError: if a `timeout` was specified and exceeded.
    """
    _timeout = int(timeout) if timeout is not None else 1
    while True:
        status = self.exchange._client.get(
            self.exchange._active_key(self.mailbox_id),
        )
        if status is None:
            raise AssertionError(
                f'Status for mailbox {self.mailbox_id} did not exist in '
                'Redis server. This means that something incorrectly '
                'deleted the key.',
            )
        elif status == _MailboxState.INACTIVE.value:
            raise MailboxClosedError(self.mailbox_id)

        raw = self.exchange._client.blpop(
            [self.exchange._queue_key(self.mailbox_id)],
            timeout=_timeout,
        )
        if raw is None and timeout is not None:
            raise TimeoutError(
                f'Timeout waiting for next message for {self.mailbox_id} '
                f'after {timeout} seconds.',
            )
        elif raw is None:  # pragma: no cover
            continue

        # Only passed one key to blpop to result is [key, item]
        assert isinstance(raw, (tuple, list))
        assert len(raw) == 2  # noqa: PLR2004
        if raw[1] == _CLOSE_SENTINEL:  # pragma: no cover
            raise MailboxClosedError(self.mailbox_id)
        message = BaseMessage.model_deserialize(raw[1])
        assert isinstance(message, get_args(Message))
        logger.debug(
            'Received %s to %s',
            type(message).__name__,
            self.mailbox_id,
        )
        return message