Skip to content

academy.exchange.cloud.server

HTTP message exchange client and server.

To start the exchange:

python -m academy.exchange.cloud --config exchange.yaml

Connect to the exchange through the client.

from academy.exchange.cloud.client import HttpExchange

with HttpExchange('localhost', 1234) as exchange:
    aid = exchange.register_agent()
    mailbox = exchange.get_mailbox(aid)
    ...
    mailbox.close()

authenticate_factory

authenticate_factory(authenticator: Authenticator) -> Any

Create an authentication middleware for a given authenticator.

Parameters:

  • authenticator (Authenticator) –

    Used to validate client id and transform token into id.

Returns:

  • Any

    A aiohttp.web.middleware function that will only allow authenticated requests.

Source code in academy/exchange/cloud/server.py
def authenticate_factory(
    authenticator: Authenticator,
) -> Any:
    """Create an authentication middleware for a given authenticator.

    Args:
        authenticator: Used to validate client id and transform token into id.

    Returns:
        A aiohttp.web.middleware function that will only allow authenticated
            requests.
    """

    @middleware
    async def authenticate(
        request: Request,
        handler: Callable[[Request], Awaitable[Response]],
    ) -> Response:
        loop = asyncio.get_running_loop()
        try:
            # Needs to be run in executor because globus client is blocking
            client_uuid: uuid.UUID = await loop.run_in_executor(
                None,
                authenticator.authenticate_user,
                request.headers,
            )
        except ForbiddenError:
            return Response(
                status=_FORBIDDEN_CODE,
                text='Token expired or revoked.',
            )
        except UnauthorizedError:
            return Response(
                status=_UNAUTHORIZED_CODE,
                text='Missing required headers.',
            )

        headers = request.headers.copy()
        headers['client_id'] = str(client_uuid)
        request = request.clone(headers=headers)
        return await handler(request)

    return authenticate

create_app

create_app(
    auth_config: ExchangeAuthConfig | None = None,
) -> Application

Create a new server application.

Source code in academy/exchange/cloud/server.py
def create_app(
    auth_config: ExchangeAuthConfig | None = None,
) -> Application:
    """Create a new server application."""
    middlewares = []
    if auth_config is not None:
        authenticator = get_authenticator(auth_config)
        middlewares.append(authenticate_factory(authenticator))

    manager = _MailboxManager()
    app = Application(middlewares=middlewares)
    app[MANAGER_KEY] = manager

    app.router.add_post('/mailbox', _create_mailbox_route)
    app.router.add_delete('/mailbox', _terminate_route)
    app.router.add_get('/mailbox', _check_mailbox_route)
    app.router.add_put('/message', _send_message_route)
    app.router.add_get('/message', _recv_message_route)
    app.router.add_get('/discover', _discover_route)

    return app

serve_app async

serve_app(
    app: Application, host: str, port: int
) -> AsyncGenerator[None]

Serve an application as a context manager.

Parameters:

  • app (Application) –

    Application to run.

  • host (str) –

    Host to bind to.

  • port (int) –

    Port to bind to.

Source code in academy/exchange/cloud/server.py
@contextlib.asynccontextmanager
async def serve_app(
    app: Application,
    host: str,
    port: int,
) -> AsyncGenerator[None]:
    """Serve an application as a context manager.

    Args:
        app: Application to run.
        host: Host to bind to.
        port: Port to bind to.
    """
    runner = AppRunner(app)
    try:
        await runner.setup()
        site = TCPSite(runner, host, port)
        await site.start()
        logger.info('Exchange listening on %s:%s', host, port)
        yield
    finally:
        await runner.cleanup()
        logger.info('Exchange closed!')