Skip to content

academy.exchange.cloud.server

HTTP message exchange client and server.

To start the exchange:

python -m academy.exchange.cloud --config exchange.yaml

Connect to the exchange through the client.

from academy.exchange.cloud.client import HttpExchangeFactory

with HttpExchangeFactory(
    'http://localhost:1234'
).create_user_client() as exchange:
    aid, agent_info = exchange.register_agent()
    ...

StatusCode

Bases: Enum

Http status codes.

authenticate_factory

authenticate_factory(authenticator: Authenticator) -> Any

Create an authentication middleware for a given authenticator.

Parameters:

  • authenticator (Authenticator) –

    Used to validate client id and transform token into id.

Returns:

  • Any

    A aiohttp.web.middleware function that will only allow authenticated requests.

Source code in academy/exchange/cloud/server.py
def authenticate_factory(
    authenticator: Authenticator,
) -> Any:
    """Create an authentication middleware for a given authenticator.

    Args:
        authenticator: Used to validate client id and transform token into id.

    Returns:
        A aiohttp.web.middleware function that will only allow authenticated
            requests.
    """

    @middleware
    async def authenticate(
        request: Request,
        handler: Callable[[Request], Awaitable[Response]],
    ) -> Response:
        loop = asyncio.get_running_loop()
        try:
            # Needs to be run in executor because globus client is blocking
            client_uuid: uuid.UUID = await loop.run_in_executor(
                None,
                authenticator.authenticate_user,
                request.headers,
            )
        except ForbiddenError:
            return Response(
                status=StatusCode.FORBIDDEN.value,
                text='Token expired or revoked.',
            )
        except UnauthorizedError:
            return Response(
                status=StatusCode.UNAUTHORIZED.value,
                text='Missing required headers.',
            )

        headers = request.headers.copy()
        headers['client_id'] = str(client_uuid)

        # Handle early client-side disconnect in Issue #142
        # This is somewhat hard to reproduce in tests:
        # https://github.com/aio-libs/aiohttp/issues/6978
        if (
            request.transport is None or request.transport.is_closing()
        ):  # pragma: no cover
            return Response(status=StatusCode.NO_RESPONSE.value)

        request = request.clone(headers=headers)
        return await handler(request)

    return authenticate

create_app

create_app(
    auth_config: ExchangeAuthConfig | None = None,
) -> Application

Create a new server application.

Source code in academy/exchange/cloud/server.py
def create_app(
    auth_config: ExchangeAuthConfig | None = None,
) -> Application:
    """Create a new server application."""
    middlewares = []
    if auth_config is not None:
        authenticator = get_authenticator(auth_config)
        middlewares.append(authenticate_factory(authenticator))

    manager = _MailboxManager()
    app = Application(middlewares=middlewares)
    app[MANAGER_KEY] = manager

    app.router.add_post('/mailbox', _create_mailbox_route)
    app.router.add_delete('/mailbox', _terminate_route)
    app.router.add_get('/mailbox', _check_mailbox_route)
    app.router.add_put('/message', _send_message_route)
    app.router.add_get('/message', _recv_message_route)
    app.router.add_get('/discover', _discover_route)

    return app