Skip to content

academy.exchange.cloud.client

HttpAgentRegistration dataclass

HttpAgentRegistration(agent_id: AgentId[AgentT])

Bases: Generic[AgentT]

Agent registration for Http exchanges.

agent_id instance-attribute

agent_id: AgentId[AgentT]

Unique identifier for the agent created by the exchange.

HttpExchangeTransport

HttpExchangeTransport(
    mailbox_id: EntityId,
    session: ClientSession,
    connection_info: _HttpConnectionInfo,
)

Bases: ExchangeTransportMixin, NoPickleMixin

Http exchange client.

Parameters:

  • mailbox_id (EntityId) –

    Identifier of the mailbox on the exchange. If there is not an id provided, the exchange will create a new client mailbox.

  • session (ClientSession) –

    Http session.

  • connection_info (_HttpConnectionInfo) –

    Exchange connection info.

Source code in academy/exchange/cloud/client.py
def __init__(
    self,
    mailbox_id: EntityId,
    session: aiohttp.ClientSession,
    connection_info: _HttpConnectionInfo,
) -> None:
    self._mailbox_id = mailbox_id
    self._session = session
    self._info = connection_info

    base_url = self._info.url
    self._mailbox_url = f'{base_url}/mailbox'
    self._message_url = f'{base_url}/message'
    self._discover_url = f'{base_url}/discover'

new async classmethod

new(
    *,
    connection_info: _HttpConnectionInfo,
    mailbox_id: EntityId | None = None,
    name: str | None = None
) -> Self

Instantiate a new transport.

Parameters:

  • connection_info (_HttpConnectionInfo) –

    Exchange connection information.

  • mailbox_id (EntityId | None, default: None ) –

    Bind the transport to the specific mailbox. If None, a new user entity will be registered and the transport will be bound to that mailbox.

  • name (str | None, default: None ) –

    Display name of the registered entity if mailbox_id is None.

Returns:

  • Self

    An instantiated transport bound to a specific mailbox.

Source code in academy/exchange/cloud/client.py
@classmethod
async def new(
    cls,
    *,
    connection_info: _HttpConnectionInfo,
    mailbox_id: EntityId | None = None,
    name: str | None = None,
) -> Self:
    """Instantiate a new transport.

    Args:
        connection_info: Exchange connection information.
        mailbox_id: Bind the transport to the specific mailbox. If `None`,
            a new user entity will be registered and the transport will be
            bound to that mailbox.
        name: Display name of the registered entity if `mailbox_id` is
            `None`.

    Returns:
        An instantiated transport bound to a specific mailbox.
    """
    ssl_verify = connection_info.ssl_verify
    if ssl_verify is None:  # pragma: no branch
        scheme = urlparse(connection_info.url).scheme
        ssl_verify = scheme == 'https'

    session = aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(ssl=ssl_verify),
        headers=connection_info.additional_headers,
    )

    if mailbox_id is None:
        mailbox_id = UserId.new(name=name)
        async with session.post(
            f'{connection_info.url}/mailbox',
            json={'mailbox': mailbox_id.model_dump_json()},
        ) as response:
            _raise_for_status(response, mailbox_id)
        logger.info('Registered %s in exchange', mailbox_id)

    return cls(mailbox_id, session, connection_info)

HttpExchangeFactory

HttpExchangeFactory(
    url: str,
    auth_method: Literal["globus"] | None = None,
    additional_headers: dict[str, str] | None = None,
    ssl_verify: bool | None = None,
)

Bases: ExchangeFactory[HttpExchangeTransport]

Http exchange client factory.

Parameters:

  • url (str) –

    Address of HTTP exchange

  • auth_method (Literal['globus'] | None, default: None ) –

    Method to get authorization headers

  • additional_headers (dict[str, str] | None, default: None ) –

    Any other information necessary to communicate with the exchange. Used for passing the Globus bearer token

  • ssl_verify (bool | None, default: None ) –

    Same as requests.Session.verify. If the server's TLS certificate should be validated. Should be true if using HTTPS Only set to false for testing or local development.

Source code in academy/exchange/cloud/client.py
def __init__(
    self,
    url: str,
    auth_method: Literal['globus'] | None = None,
    additional_headers: dict[str, str] | None = None,
    ssl_verify: bool | None = None,
) -> None:
    if additional_headers is None:
        additional_headers = {}
    additional_headers |= get_auth_headers(auth_method)

    self._info = _HttpConnectionInfo(
        url=url,
        additional_headers=additional_headers,
        ssl_verify=ssl_verify,
    )

create_agent_client async

create_agent_client(
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler,
) -> AgentExchangeClient[AgentT, ExchangeTransportT]

Create a new agent exchange client.

An agent must be registered with the exchange before an exchange client can be created. For example:

factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)

Parameters:

  • registration (AgentRegistration[AgentT]) –

    Registration information returned by the exchange.

  • request_handler (RequestHandler) –

    Agent request message handler.

Returns:

Raises:

  • BadEntityIdError

    If an agent with registration.agent_id is not already registered with the exchange.

Source code in academy/exchange/__init__.py
async def create_agent_client(
    self,
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler,
) -> AgentExchangeClient[AgentT, ExchangeTransportT]:
    """Create a new agent exchange client.

    An agent must be registered with the exchange before an exchange
    client can be created. For example:
    ```python
    factory = ExchangeFactory(...)
    user_client = factory.create_user_client()
    registration = user_client.register_agent(...)
    agent_client = factory.create_agent_client(registration, ...)
    ```

    Args:
        registration: Registration information returned by the exchange.
        request_handler: Agent request message handler.

    Returns:
        Agent exchange client.

    Raises:
        BadEntityIdError: If an agent with `registration.agent_id` is not
            already registered with the exchange.
    """
    agent_id: AgentId[AgentT] = registration.agent_id
    transport = await self._create_transport(
        mailbox_id=agent_id,
        registration=registration,
    )
    assert transport.mailbox_id == agent_id
    status = await transport.status(agent_id)
    if status != MailboxStatus.ACTIVE:
        await transport.close()
        raise BadEntityIdError(agent_id)
    return AgentExchangeClient(
        agent_id,
        transport,
        request_handler=request_handler,
    )

create_user_client async

create_user_client(
    *, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]

Create a new user in the exchange and associated client.

Parameters:

  • name (str | None, default: None ) –

    Display name of the client on the exchange.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Returns:

Source code in academy/exchange/__init__.py
async def create_user_client(
    self,
    *,
    name: str | None = None,
    start_listener: bool = True,
) -> UserExchangeClient[ExchangeTransportT]:
    """Create a new user in the exchange and associated client.

    Args:
        name: Display name of the client on the exchange.
        start_listener: Start a message listener thread.

    Returns:
        User exchange client.
    """
    transport = await self._create_transport(mailbox_id=None, name=name)
    user_id = transport.mailbox_id
    assert isinstance(user_id, UserId)
    return UserExchangeClient(
        user_id,
        transport,
        start_listener=start_listener,
    )

spawn_http_exchange

spawn_http_exchange(
    host: str = "0.0.0.0",
    port: int = 5463,
    *,
    level: int | str = WARNING,
    timeout: float | None = None
) -> Generator[HttpExchangeFactory]

Context manager that spawns an HTTP exchange in a subprocess.

This function spawns a new process (rather than forking) and wait to return until a connection with the exchange has been established. When exiting the context manager, SIGINT will be sent to the exchange process. If the process does not exit within 5 seconds, it will be killed.

Warning

The exclusion of authentication and ssl configuration is intentional. This method should only be used for temporary exchanges in trusted environments (i.e. the login node of a cluster).

Parameters:

  • host (str, default: '0.0.0.0' ) –

    Host the exchange should listen on.

  • port (int, default: 5463 ) –

    Port the exchange should listen on.

  • level (int | str, default: WARNING ) –

    Logging level.

  • timeout (float | None, default: None ) –

    Connection timeout when waiting for exchange to start.

Returns:

Source code in academy/exchange/cloud/client.py
@contextlib.contextmanager
def spawn_http_exchange(
    host: str = '0.0.0.0',
    port: int = 5463,
    *,
    level: int | str = logging.WARNING,
    timeout: float | None = None,
) -> Generator[HttpExchangeFactory]:
    """Context manager that spawns an HTTP exchange in a subprocess.

    This function spawns a new process (rather than forking) and wait to
    return until a connection with the exchange has been established.
    When exiting the context manager, `SIGINT` will be sent to the exchange
    process. If the process does not exit within 5 seconds, it will be
    killed.

    Warning:
        The exclusion of authentication and ssl configuration is
        intentional. This method should only be used for temporary exchanges
        in trusted environments (i.e. the login node of a cluster).

    Args:
        host: Host the exchange should listen on.
        port: Port the exchange should listen on.
        level: Logging level.
        timeout: Connection timeout when waiting for exchange to start.

    Returns:
        Exchange interface connected to the spawned exchange.
    """
    # Fork is not safe in multi-threaded context.
    multiprocessing.set_start_method('spawn')

    config = ExchangeServingConfig(host=host, port=port, log_level=level)
    exchange_process = multiprocessing.Process(
        target=_run,
        args=(config,),
    )
    exchange_process.start()

    logger.info('Starting exchange server...')
    wait_connection(host, port, timeout=timeout)
    logger.info('Started exchange server!')

    base_url = f'http://{host}:{port}'
    factory = HttpExchangeFactory(base_url)
    try:
        yield factory
    finally:
        logger.info('Terminating exchange server...')
        wait = 5
        exchange_process.terminate()
        exchange_process.join(timeout=wait)
        if exchange_process.exitcode is None:  # pragma: no cover
            logger.info(
                'Killing exchange server after waiting %s seconds',
                wait,
            )
            exchange_process.kill()
        else:
            logger.info('Terminated exchange server!')
        exchange_process.close()