HTTP message exchange client and server.
To start the exchange:
python -m academy.exchange.cloud --config exchange.yaml
Connect to the exchange through the client.
from academy.exchange.cloud.client import HttpExchange
with HttpExchange('localhost', 1234) as exchange:
aid = exchange.register_agent()
mailbox = exchange.get_mailbox(aid)
...
mailbox.close()
authenticate_factory
Create an authentication middleware for a given authenticator.
Parameters:
-
authenticator
(Authenticator
)
–
Used to validate client id and transform token into id.
Returns:
-
Any
–
A aiohttp.web.middleware function that will only allow authenticated
requests.
Source code in academy/exchange/cloud/server.py
| def authenticate_factory(
authenticator: Authenticator,
) -> Any:
"""Create an authentication middleware for a given authenticator.
Args:
authenticator: Used to validate client id and transform token into id.
Returns:
A aiohttp.web.middleware function that will only allow authenticated
requests.
"""
@middleware
async def authenticate(
request: Request,
handler: Callable[[Request], Awaitable[Response]],
) -> Response:
loop = asyncio.get_running_loop()
try:
# Needs to be run in executor because globus client is blocking
client_uuid: uuid.UUID = await loop.run_in_executor(
None,
authenticator.authenticate_user,
request.headers,
)
except ForbiddenError:
return Response(
status=_FORBIDDEN_CODE,
text='Token expired or revoked.',
)
except UnauthorizedError:
return Response(
status=_UNAUTHORIZED_CODE,
text='Missing required headers.',
)
headers = request.headers.copy()
headers['client_id'] = str(client_uuid)
request = request.clone(headers=headers)
return await handler(request)
return authenticate
|
create_app
Create a new server application.
Source code in academy/exchange/cloud/server.py
| def create_app(
auth_config: ExchangeAuthConfig | None = None,
) -> Application:
"""Create a new server application."""
middlewares = []
if auth_config is not None:
authenticator = get_authenticator(auth_config)
middlewares.append(authenticate_factory(authenticator))
manager = _MailboxManager()
app = Application(middlewares=middlewares)
app[MANAGER_KEY] = manager
app.router.add_post('/mailbox', _create_mailbox_route)
app.router.add_delete('/mailbox', _terminate_route)
app.router.add_get('/mailbox', _check_mailbox_route)
app.router.add_put('/message', _send_message_route)
app.router.add_get('/message', _recv_message_route)
app.router.add_get('/discover', _discover_route)
return app
|
serve_app
async
Serve an application as a context manager.
Parameters:
-
app
(Application
)
–
-
host
(str
)
–
-
port
(int
)
–
Source code in academy/exchange/cloud/server.py
| @contextlib.asynccontextmanager
async def serve_app(
app: Application,
host: str,
port: int,
) -> AsyncGenerator[None]:
"""Serve an application as a context manager.
Args:
app: Application to run.
host: Host to bind to.
port: Port to bind to.
"""
runner = AppRunner(app)
try:
await runner.setup()
site = TCPSite(runner, host, port)
await site.start()
logger.info('Exchange listening on %s:%s', host, port)
yield
finally:
await runner.cleanup()
logger.info('Exchange closed!')
|