Skip to content

academy.exchange.hybrid

HybridExchange

HybridExchange(
    redis_host: str,
    redis_port: int,
    *,
    interface: str | None = None,
    namespace: str | None = "default",
    redis_kwargs: dict[str, Any] | None = None
)

Bases: ExchangeMixin

Hybrid exchange.

The hybrid exchange uses peer-to-peer communication via TCP and a central Redis server for mailbox state and queueing messages for offline entities.

Parameters:

  • redis_host (str) –

    Redis server hostname.

  • redis_port (int) –

    Redis server port.

  • interface (str | None, default: None ) –

    Network interface use for peer-to-peer communication. If None, the hostname of the local host is used.

  • namespace (str | None, default: 'default' ) –

    Redis key namespace. If None a random key prefix is generated.

  • redis_kwargs (dict[str, Any] | None, default: None ) –

    Extra keyword arguments to pass to redis.Redis().

Raises:

Source code in academy/exchange/hybrid.py
def __init__(
    self,
    redis_host: str,
    redis_port: int,
    *,
    interface: str | None = None,
    namespace: str | None = 'default',
    redis_kwargs: dict[str, Any] | None = None,
) -> None:
    self._namespace = (
        namespace
        if namespace is not None
        else uuid_to_base32(uuid.uuid4())
    )
    self._interface = interface
    self._redis_host = redis_host
    self._redis_port = redis_port
    self._redis_kwargs = redis_kwargs if redis_kwargs is not None else {}

    self._init_connections()

close

close() -> None

Close the exchange interface.

Source code in academy/exchange/hybrid.py
def close(self) -> None:
    """Close the exchange interface."""
    self._redis_client.close()
    self._socket_pool.close()
    logger.debug('Closed exchange (%s)', self)

register_agent

register_agent(
    behavior: type[BehaviorT],
    *,
    agent_id: AgentId[BehaviorT] | None = None,
    name: str | None = None
) -> AgentId[BehaviorT]

Create a new agent identifier and associated mailbox.

Parameters:

  • behavior (type[BehaviorT]) –

    Type of the behavior this agent will implement.

  • agent_id (AgentId[BehaviorT] | None, default: None ) –

    Specify the ID of the agent. Randomly generated default.

  • name (str | None, default: None ) –

    Optional human-readable name for the agent. Ignored if agent_id is provided.

Returns:

  • AgentId[BehaviorT]

    Unique identifier for the agent's mailbox.

Source code in academy/exchange/hybrid.py
def register_agent(
    self,
    behavior: type[BehaviorT],
    *,
    agent_id: AgentId[BehaviorT] | None = None,
    name: str | None = None,
) -> AgentId[BehaviorT]:
    """Create a new agent identifier and associated mailbox.

    Args:
        behavior: Type of the behavior this agent will implement.
        agent_id: Specify the ID of the agent. Randomly generated
            default.
        name: Optional human-readable name for the agent. Ignored if
            `agent_id` is provided.

    Returns:
        Unique identifier for the agent's mailbox.
    """
    aid = AgentId.new(name=name) if agent_id is None else agent_id
    self._redis_client.set(
        self._status_key(aid),
        _MailboxState.ACTIVE.value,
    )
    self._redis_client.set(
        self._behavior_key(aid),
        ','.join(behavior.behavior_mro()),
    )
    logger.debug('Registered %s in %s', aid, self)
    return aid

register_client

register_client(*, name: str | None = None) -> ClientId

Create a new client identifier and associated mailbox.

Parameters:

  • name (str | None, default: None ) –

    Optional human-readable name for the client.

Returns:

  • ClientId

    Unique identifier for the client's mailbox.

Source code in academy/exchange/hybrid.py
def register_client(
    self,
    *,
    name: str | None = None,
) -> ClientId:
    """Create a new client identifier and associated mailbox.

    Args:
        name: Optional human-readable name for the client.

    Returns:
        Unique identifier for the client's mailbox.
    """
    cid = ClientId.new(name=name)
    self._redis_client.set(
        self._status_key(cid),
        _MailboxState.ACTIVE.value,
    )
    logger.debug('Registered %s in %s', cid, self)
    return cid

terminate

terminate(uid: EntityId) -> None

Close the mailbox for an entity from the exchange.

This sets the state of the mailbox to inactive in the Redis server, and deletes any queued messages in Redis.

Parameters:

  • uid (EntityId) –

    Entity identifier of the mailbox to close.

Source code in academy/exchange/hybrid.py
def terminate(self, uid: EntityId) -> None:
    """Close the mailbox for an entity from the exchange.

    This sets the state of the mailbox to inactive in the Redis server,
    and deletes any queued messages in Redis.

    Args:
        uid: Entity identifier of the mailbox to close.
    """
    self._redis_client.set(
        self._status_key(uid),
        _MailboxState.INACTIVE.value,
    )
    # Sending a close sentinel to the queue is a quick way to force
    # the entity waiting on messages to the mailbox to stop blocking.
    # This assumes that only one entity is reading from the mailbox.
    self._redis_client.rpush(self._queue_key(uid), _CLOSE_SENTINEL)
    if isinstance(uid, AgentId):
        self._redis_client.delete(self._behavior_key(uid))
    logger.debug('Closed mailbox for %s (%s)', uid, self)

discover

discover(
    behavior: type[Behavior],
    *,
    allow_subclasses: bool = True
) -> tuple[AgentId[Any], ...]

Discover peer agents with a given behavior.

Warning

This method is O(n) and scans all keys in the Redis server.

Parameters:

  • behavior (type[Behavior]) –

    Behavior type of interest.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the behavior.

Returns:

  • tuple[AgentId[Any], ...]

    Tuple of agent IDs implementing the behavior.

Source code in academy/exchange/hybrid.py
def discover(
    self,
    behavior: type[Behavior],
    *,
    allow_subclasses: bool = True,
) -> tuple[AgentId[Any], ...]:
    """Discover peer agents with a given behavior.

    Warning:
        This method is O(n) and scans all keys in the Redis server.

    Args:
        behavior: Behavior type of interest.
        allow_subclasses: Return agents implementing subclasses of the
            behavior.

    Returns:
        Tuple of agent IDs implementing the behavior.
    """
    found: list[AgentId[Any]] = []
    fqp = f'{behavior.__module__}.{behavior.__name__}'
    for key in self._redis_client.scan_iter(
        f'{self._namespace}:behavior:*',
    ):
        mro_str = self._redis_client.get(key)
        assert isinstance(mro_str, str)
        mro = mro_str.split(',')
        if fqp == mro[0] or (allow_subclasses and fqp in mro):
            aid: AgentId[Any] = AgentId(
                uid=base32_to_uuid(key.split(':')[-1]),
            )
            found.append(aid)
    active: list[AgentId[Any]] = []
    for aid in found:
        status = self._redis_client.get(self._status_key(aid))
        if status == _MailboxState.ACTIVE.value:  # pragma: no branch
            active.append(aid)
    return tuple(active)

get_mailbox

get_mailbox(uid: EntityId) -> HybridMailbox

Get a client to a specific mailbox.

Parameters:

  • uid (EntityId) –

    EntityId of the mailbox.

Returns:

Raises:

Source code in academy/exchange/hybrid.py
def get_mailbox(self, uid: EntityId) -> HybridMailbox:
    """Get a client to a specific mailbox.

    Args:
        uid: EntityId of the mailbox.

    Returns:
        Mailbox client.

    Raises:
        BadEntityIdError: if a mailbox for `uid` does not exist.
    """
    status = self._redis_client.get(self._status_key(uid))
    if status is None:
        raise BadEntityIdError(uid)
    return HybridMailbox(uid, self, interface=self._interface)

send

send(uid: EntityId, message: Message) -> None

Send a message to a mailbox.

To send a message, the client first checks that the state of the mailbox in Redis is active; otherwise, an error is raised. Then, the client checks to see if the peer entity is available by checking for an address of the peer in Redis. If the peer's address is found, the message is sent directly to the peer via ZMQ; otherwise, the message is put in a Redis queue for later retrieval.

Parameters:

  • uid (EntityId) –

    Destination address of the message.

  • message (Message) –

    Message to send.

Raises:

Source code in academy/exchange/hybrid.py
def send(self, uid: EntityId, message: Message) -> None:
    """Send a message to a mailbox.

    To send a message, the client first checks that the state of the
    mailbox in Redis is active; otherwise, an error is raised. Then,
    the client checks to see if the peer entity is available by
    checking for an address of the peer in Redis. If the peer's address
    is found, the message is sent directly to the peer via ZMQ; otherwise,
    the message is put in a Redis queue for later retrieval.

    Args:
        uid: Destination address of the message.
        message: Message to send.

    Raises:
        BadEntityIdError: if a mailbox for `uid` does not exist.
        MailboxClosedError: if the mailbox was closed.
    """
    address = self._address_cache.get(uid, None)
    if address is not None:
        try:
            # This is as optimistic as possible. If the address of the
            # peer is cached, we assume the mailbox is still active and
            # the peer is still listening.
            self._send_direct(address, message)
        except (SocketClosedError, OSError):
            # Our optimism let us down so clear the cache and try the
            # standard flow.
            self._address_cache.pop(uid)
        else:
            return

    status = self._redis_client.get(self._status_key(uid))
    if status is None:
        raise BadEntityIdError(uid)
    elif status == _MailboxState.INACTIVE.value:
        raise MailboxClosedError(uid)

    maybe_address = self._redis_client.get(self._address_key(uid))
    try:
        # This branching is a little odd. We want to fall back to
        # Redis for message sending on two conditions: direct send fails
        # or no address was found. We raise a TypeError if no address
        # was found as a shortcut to get to the fall back.
        if isinstance(maybe_address, (bytes, str)):
            decoded_address = (
                maybe_address.decode('utf-8')
                if isinstance(maybe_address, bytes)
                else maybe_address
            )
            self._send_direct(decoded_address, message)
            self._address_cache[uid] = decoded_address
        else:
            raise TypeError('Did not active peer address in Redis.')
    except (TypeError, SocketClosedError, OSError):
        self._redis_client.rpush(
            self._queue_key(uid),
            message.model_serialize(),
        )
        logger.debug(
            'Sent %s to %s via redis',
            type(message).__name__,
            uid,
        )

get_handle

get_handle(
    aid: AgentId[BehaviorT],
) -> UnboundRemoteHandle[BehaviorT]

Create a new handle to an agent.

A handle enables a client to invoke actions on the agent.

Note

It is not possible to create a handle to a client since a handle is essentially a new client of a specific agent.

Parameters:

  • aid (AgentId[BehaviorT]) –

    EntityId of the agent to create an handle to.

Returns:

Raises:

Source code in academy/exchange/__init__.py
def get_handle(
    self: Exchange,
    aid: AgentId[BehaviorT],
) -> UnboundRemoteHandle[BehaviorT]:
    """Create a new handle to an agent.

    A handle enables a client to invoke actions on the agent.

    Note:
        It is not possible to create a handle to a client since a handle
        is essentially a new client of a specific agent.

    Args:
        aid: EntityId of the agent to create an handle to.

    Returns:
        Handle to the agent.

    Raises:
        TypeError: if `aid` is not an instance of
            [`AgentId`][academy.identifier.AgentId].
    """
    if not isinstance(aid, AgentId):
        raise TypeError(
            f'Handle must be created from an {AgentId.__name__} '
            f'but got identifier with type {type(aid).__name__}.',
        )
    return UnboundRemoteHandle(self, aid)

HybridMailbox

HybridMailbox(
    uid: EntityId,
    exchange: HybridExchange,
    *,
    interface: str | None = None,
    port: int | None = None
)

Bases: NoPickleMixin

Client protocol that listens to incoming messages to a mailbox.

This class acts as the endpoint for messages sent to a particular mailbox. This is done via starting two threads once initialized: (1) a ZMQ server thread that listens for messages from peers, and (2) a thread that checks the Redis server for any offline messages and state changes to the mailbox (i.e., mailbox closure).

Parameters:

  • uid (EntityId) –

    EntityId of the mailbox.

  • exchange (HybridExchange) –

    Exchange client.

  • interface (str | None, default: None ) –

    Network interface use for peer-to-peer communication. If None, the hostname of the local host is used.

  • port (int | None, default: None ) –

    Port to use for peer-to-peer communication. Randomly selected if None.

Source code in academy/exchange/hybrid.py
def __init__(
    self,
    uid: EntityId,
    exchange: HybridExchange,
    *,
    interface: str | None = None,
    port: int | None = None,
) -> None:
    self._uid = uid
    self._exchange = exchange
    self._interface = interface
    self._messages: Queue[Message] = Queue()

    self._closed = threading.Event()
    self._socket_poll_timeout_ms = _SOCKET_POLL_TIMEOUT_MS

    host = (
        address_by_interface(interface)
        if interface is not None
        else address_by_hostname()
    )
    self._server = SimpleSocketServer(
        handler=self._server_handler,
        host=host,
        port=port,
        timeout=_THREAD_JOIN_TIMEOUT,
    )
    self._server.start_server_thread()

    self.exchange._redis_client.set(
        self.exchange._address_key(uid),
        f'{self._server.host}:{self._server.port}',
    )

    self._redis_thread = threading.Thread(
        target=self._redis_watcher,
        name=f'hybrid-mailbox-redis-watcher-{uid}',
    )
    self._redis_started = threading.Event()
    self._redis_thread.start()
    self._redis_started.wait(timeout=_THREAD_START_TIMEOUT)

exchange property

exchange: HybridExchange

Exchange client.

mailbox_id property

mailbox_id: EntityId

Mailbox address/identifier.

close

close() -> None

Close this mailbox client.

Warning

This does not close the mailbox in the exchange. I.e., the exchange will still accept new messages to this mailbox, but this client will no longer be listening for them.

Source code in academy/exchange/hybrid.py
def close(self) -> None:
    """Close this mailbox client.

    Warning:
        This does not close the mailbox in the exchange. I.e., the exchange
        will still accept new messages to this mailbox, but this client
        will no longer be listening for them.
    """
    self._closed.set()
    self.exchange._redis_client.delete(
        self.exchange._address_key(self.mailbox_id),
    )

    self._server.stop_server_thread()

    self._redis_thread.join(_THREAD_JOIN_TIMEOUT)
    if self._redis_thread.is_alive():  # pragma: no cover
        raise TimeoutError(
            'Redis watcher thread failed to exit within '
            f'{_THREAD_JOIN_TIMEOUT} seconds.',
        )

    self._messages.close()

recv

recv(timeout: float | None = None) -> Message

Receive the next message in the mailbox.

This blocks until the next message is received or the mailbox is closed.

Parameters:

  • timeout (float | None, default: None ) –

    Optional timeout in seconds to wait for the next message. If None, the default, block forever until the next message or the mailbox is closed. Note that this will be cast to an int which is required by the Redis API.

Raises:

Source code in academy/exchange/hybrid.py
def recv(self, timeout: float | None = None) -> Message:
    """Receive the next message in the mailbox.

    This blocks until the next message is received or the mailbox
    is closed.

    Args:
        timeout: Optional timeout in seconds to wait for the next
            message. If `None`, the default, block forever until the
            next message or the mailbox is closed. Note that this will
            be cast to an int which is required by the Redis API.

    Raises:
        MailboxClosedError: if the mailbox was closed.
        TimeoutError: if a `timeout` was specified and exceeded.
    """
    try:
        return self._messages.get(timeout=timeout)
    except QueueClosedError:
        raise MailboxClosedError(self.mailbox_id) from None

base32_to_uuid

base32_to_uuid(uid: str) -> UUID

Parse a base32 string as a UUID.

Source code in academy/exchange/hybrid.py
def base32_to_uuid(uid: str) -> uuid.UUID:
    """Parse a base32 string as a UUID."""
    padding = '=' * ((8 - len(uid) % 8) % 8)
    padded = uid + padding
    uid_bytes = base64.b32decode(padded)
    return uuid.UUID(bytes=uid_bytes)

uuid_to_base32

uuid_to_base32(uid: UUID) -> str

Encode a UUID as a trimmed base32 string.

Source code in academy/exchange/hybrid.py
def uuid_to_base32(uid: uuid.UUID) -> str:
    """Encode a UUID as a trimmed base32 string."""
    uid_bytes = uid.bytes
    base32_bytes = base64.b32encode(uid_bytes).rstrip(b'=')
    return base32_bytes.decode('utf-8')