Skip to content

academy.exchange.hybrid

HybridAgentRegistration dataclass

HybridAgentRegistration(agent_id: AgentId[AgentT])

Bases: Generic[AgentT]

Agent registration for hybrid exchanges.

agent_id instance-attribute

agent_id: AgentId[AgentT]

Unique identifier for the agent created by the exchange.

HybridExchangeTransport

HybridExchangeTransport(
    mailbox_id: EntityId,
    redis_client: Redis,
    *,
    redis_info: _RedisConnectionInfo,
    namespace: str,
    host: str,
    port: int,
    interface: str | None = None
)

Bases: ExchangeTransportMixin, NoPickleMixin

Hybrid exchange transport bound to a specific mailbox.

Source code in academy/exchange/hybrid.py
def __init__(  # noqa: PLR0913
    self,
    mailbox_id: EntityId,
    redis_client: redis.asyncio.Redis,
    *,
    redis_info: _RedisConnectionInfo,
    namespace: str,
    host: str,
    port: int,
    interface: str | None = None,
) -> None:
    self._mailbox_id = mailbox_id
    self._redis_client = redis_client
    self._redis_info = redis_info
    self._namespace = namespace
    self._host = host
    self._port = port
    self._interface = interface

    self._address_cache: dict[EntityId, str] = {}
    if sys.version_info >= (3, 13):  # pragma: >=3.13 cover
        self._messages: AsyncQueue[Message] = Queue()
    else:  # pragma: <3.13 cover
        self._messages: AsyncQueue[Message] = Queue().async_q
    self._socket_pool = SocketPool()
    self._started = asyncio.Event()
    self._shutdown = asyncio.Event()

    self._server = SimpleSocketServer(
        handler=self._direct_message_handler,
        host=host,
        port=port,
    )
    self._server_task = asyncio.create_task(
        self._run_direct_server(),
        name=f'hybrid-transport-direct-server-{self.mailbox_id}',
    )
    self._redis_task = asyncio.create_task(
        self._run_redis_listener(),
        name=f'hybrid-transport-redis-watcher-{self.mailbox_id}',
    )

new async classmethod

new(
    *,
    namespace: str,
    redis_info: _RedisConnectionInfo,
    interface: str | None = None,
    mailbox_id: EntityId | None = None,
    name: str | None = None,
    port: int | None = None
) -> Self

Instantiate a new transport.

Parameters:

  • namespace (str) –

    Redis key namespace.

  • redis_info (_RedisConnectionInfo) –

    Redis connection information.

  • interface (str | None, default: None ) –

    Network interface use for peer-to-peer communication. If None, the hostname of the local host is used.

  • mailbox_id (EntityId | None, default: None ) –

    Bind the transport to the specific mailbox. If None, a new user entity will be registered and the transport will be bound to that mailbox.

  • name (str | None, default: None ) –

    Display name of the registered entity if mailbox_id is None.

  • port (int | None, default: None ) –

    Port to listen for peer connection on.

Returns:

  • Self

    An instantiated transport bound to a specific mailbox.

Raises:

Source code in academy/exchange/hybrid.py
@classmethod
async def new(  # noqa: PLR0913
    cls,
    *,
    namespace: str,
    redis_info: _RedisConnectionInfo,
    interface: str | None = None,
    mailbox_id: EntityId | None = None,
    name: str | None = None,
    port: int | None = None,
) -> Self:
    """Instantiate a new transport.

    Args:
        namespace: Redis key namespace.
        redis_info: Redis connection information.
        interface: Network interface use for peer-to-peer communication.
            If `None`, the hostname of the local host is used.
        mailbox_id: Bind the transport to the specific mailbox. If `None`,
            a new user entity will be registered and the transport will be
            bound to that mailbox.
        name: Display name of the registered entity if `mailbox_id` is
            `None`.
        port: Port to listen for peer connection on.

    Returns:
        An instantiated transport bound to a specific mailbox.

    Raises:
        redis.exceptions.ConnectionError: If the Redis server is not
            reachable.
    """
    host = (
        address_by_interface(interface)
        if interface is not None
        else address_by_hostname()
    )
    port = port if port is not None else open_port()

    client = redis.asyncio.Redis(
        host=redis_info.hostname,
        port=redis_info.port,
        decode_responses=False,
        **redis_info.kwargs,
    )
    # Ensure the redis server is reachable else fail early
    await client.ping()

    if mailbox_id is None:
        mailbox_id = UserId.new(name=name)
        await client.set(
            f'{namespace}:status:{uuid_to_base32(mailbox_id.uid)}',
            _MailboxState.ACTIVE.value,
        )
        logger.info('Registered %s in exchange', mailbox_id)

    await client.set(
        f'{namespace}:address:{uuid_to_base32(mailbox_id.uid)}',
        f'{host}:{port}',
    )

    transport = cls(
        mailbox_id,
        client,
        redis_info=redis_info,
        namespace=namespace,
        interface=interface,
        host=host,
        port=port,
    )
    # Wait for the direct message server to start
    await asyncio.wait_for(transport._started.wait(), timeout=5)
    return transport

HybridExchangeFactory

HybridExchangeFactory(
    redis_host: str,
    redis_port: int,
    *,
    redis_kwargs: dict[str, Any] | None = None,
    interface: str | None = None,
    namespace: str | None = "default",
    ports: Iterable[int] | None = None
)

Bases: ExchangeFactory[HybridExchangeTransport]

Hybrid exchange client factory.

The hybrid exchange uses peer-to-peer communication via TCP and a central Redis server for mailbox state and queueing messages for offline entities.

Parameters:

  • redis_host (str) –

    Redis server hostname.

  • redis_port (int) –

    Redis server port.

  • redis_kwargs (dict[str, Any] | None, default: None ) –

    Extra keyword arguments to pass to redis.Redis().

  • interface (str | None, default: None ) –

    Network interface use for peer-to-peer communication. If None, the hostname of the local host is used.

  • namespace (str | None, default: 'default' ) –

    Redis key namespace. If None a random key prefix is generated.

  • ports (Iterable[int] | None, default: None ) –

    An iterable of ports to give each client a unique port from a user defined set. A StopIteration exception will be raised in create_*_client() methods if the number of clients in the process is greater than the length of the iterable.

Source code in academy/exchange/hybrid.py
def __init__(  # noqa: PLR0913
    self,
    redis_host: str,
    redis_port: int,
    *,
    redis_kwargs: dict[str, Any] | None = None,
    interface: str | None = None,
    namespace: str | None = 'default',
    ports: Iterable[int] | None = None,
) -> None:
    self._namespace = (
        namespace
        if namespace is not None
        else uuid_to_base32(uuid.uuid4())
    )
    self._interface = interface
    self._redis_info = _RedisConnectionInfo(
        redis_host,
        redis_port,
        redis_kwargs if redis_kwargs is not None else {},
    )
    self._ports = None if ports is None else iter(ports)

create_agent_client async

create_agent_client(
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler,
) -> AgentExchangeClient[AgentT, ExchangeTransportT]

Create a new agent exchange client.

An agent must be registered with the exchange before an exchange client can be created. For example:

factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)

Parameters:

  • registration (AgentRegistration[AgentT]) –

    Registration information returned by the exchange.

  • request_handler (RequestHandler) –

    Agent request message handler.

Returns:

Raises:

  • BadEntityIdError

    If an agent with registration.agent_id is not already registered with the exchange.

Source code in academy/exchange/__init__.py
async def create_agent_client(
    self,
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler,
) -> AgentExchangeClient[AgentT, ExchangeTransportT]:
    """Create a new agent exchange client.

    An agent must be registered with the exchange before an exchange
    client can be created. For example:
    ```python
    factory = ExchangeFactory(...)
    user_client = factory.create_user_client()
    registration = user_client.register_agent(...)
    agent_client = factory.create_agent_client(registration, ...)
    ```

    Args:
        registration: Registration information returned by the exchange.
        request_handler: Agent request message handler.

    Returns:
        Agent exchange client.

    Raises:
        BadEntityIdError: If an agent with `registration.agent_id` is not
            already registered with the exchange.
    """
    agent_id: AgentId[AgentT] = registration.agent_id
    transport = await self._create_transport(
        mailbox_id=agent_id,
        registration=registration,
    )
    assert transport.mailbox_id == agent_id
    status = await transport.status(agent_id)
    if status != MailboxStatus.ACTIVE:
        await transport.close()
        raise BadEntityIdError(agent_id)
    return AgentExchangeClient(
        agent_id,
        transport,
        request_handler=request_handler,
    )

create_user_client async

create_user_client(
    *, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]

Create a new user in the exchange and associated client.

Parameters:

  • name (str | None, default: None ) –

    Display name of the client on the exchange.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Returns:

Source code in academy/exchange/__init__.py
async def create_user_client(
    self,
    *,
    name: str | None = None,
    start_listener: bool = True,
) -> UserExchangeClient[ExchangeTransportT]:
    """Create a new user in the exchange and associated client.

    Args:
        name: Display name of the client on the exchange.
        start_listener: Start a message listener thread.

    Returns:
        User exchange client.
    """
    transport = await self._create_transport(mailbox_id=None, name=name)
    user_id = transport.mailbox_id
    assert isinstance(user_id, UserId)
    return UserExchangeClient(
        user_id,
        transport,
        start_listener=start_listener,
    )

base32_to_uuid

base32_to_uuid(uid: str) -> UUID

Parse a base32 string as a UUID.

Source code in academy/exchange/hybrid.py
def base32_to_uuid(uid: str) -> uuid.UUID:
    """Parse a base32 string as a UUID."""
    padding = '=' * ((8 - len(uid) % 8) % 8)
    padded = uid + padding
    uid_bytes = base64.b32decode(padded)
    return uuid.UUID(bytes=uid_bytes)

uuid_to_base32

uuid_to_base32(uid: UUID) -> str

Encode a UUID as a trimmed base32 string.

Source code in academy/exchange/hybrid.py
def uuid_to_base32(uid: uuid.UUID) -> str:
    """Encode a UUID as a trimmed base32 string."""
    uid_bytes = uid.bytes
    base32_bytes = base64.b32encode(uid_bytes).rstrip(b'=')
    return base32_bytes.decode('utf-8')