academy.manager¶
Manager
¶
Manager(
exchange_client: UserExchangeClient[ExchangeTransportT],
executors: Executor | MutableMapping[str, Executor],
*,
default_executor: str | None = None,
max_retries: int = 0
)
Bases: Generic[ExchangeTransportT]
, NoPickleMixin
Launch and manage running agents.
A manager is used to launch agents using one or more
Executors
and interact with/manage those
agents.
Tip
This class can be used as a context manager. Upon exiting the context, running agents will be shutdown, any agent handles created by the manager will be closed, and the executors will be shutdown.
Tip
When using
ProcessPoolExecutors
,
use the initializer
argument to configure logging in the worker
processes that will execute agents.
Note
The manager takes ownership of the exchange client and executors, meaning the manager will clean up those resources when the manager is closed.
Parameters:
-
exchange_client
(UserExchangeClient[ExchangeTransportT]
) –Exchange client.
-
executors
(Executor | MutableMapping[str, Executor]
) –An executor instance or mapping of names to executors to use to run agents. If a single executor is provided, it is set as the default executor with name
'default'
, overriding any value ofdefault_executor
. -
default_executor
(str | None
, default:None
) –Specify the name of the default executor to use when not specified in
launch()
. -
max_retries
(int
, default:0
) –Maximum number of times to retry running an agent if it exits with an error.
Raises:
-
ValueError
–If
default_executor
is specified but does not exist inexecutors
.
Source code in academy/manager.py
exchange_client
property
¶
exchange_client: UserExchangeClient[ExchangeTransportT]
User client for the exchange.
exchange_factory
property
¶
exchange_factory: ExchangeFactory[ExchangeTransportT]
Client factory for the exchange.
from_exchange_factory
async
classmethod
¶
from_exchange_factory(
factory: ExchangeFactory[ExchangeTransportT],
executors: Executor | MutableMapping[str, Executor],
*,
default_executor: str | None = None,
max_retries: int = 0
) -> Self
Instantiate a new exchange client and manager from a factory.
Source code in academy/manager.py
close
async
¶
Shutdown the manager and cleanup resources.
- Request all running agents to shut down.
- Wait for all running agents to shut down.
- Close the exchange client.
- Shutdown the executors.
- Raise an exceptions returned by agents.
Raises:
-
Exception
–Any exceptions raised by agents.
Source code in academy/manager.py
add_executor
¶
Add an executor to the manager.
Note
It is not possible to remove an executor as this could create complications if an agent is already running in that executor.
Parameters:
-
name
(str
) –Name of the executor used when launching agents.
-
executor
(Executor
) –Executor instance.
Returns:
-
Self
–Self for chaining.
Raises:
-
ValueError
–If an executor with
name
already exists.
Source code in academy/manager.py
set_default_executor
¶
set_default_executor(name: str | None) -> Self
Set the default executor by name.
Parameters:
-
name
(str | None
) –Name of the executor to use as default. If
None
, no default executor is set and all calls tolaunch()
must specify the executor.
Returns:
-
Self
–Self for chaining.
Raises:
-
ValueError
–If no executor with
name
exists.
Source code in academy/manager.py
launch
async
¶
launch(
agent: AgentT | type[AgentT],
*,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
config: RuntimeConfig | None = None,
executor: str | None = None,
name: str | None = None,
registration: AgentRegistration[AgentT] | None = None
) -> RemoteHandle[AgentT]
Launch a new agent with a specified agent.
Parameters:
-
agent
(AgentT | type[AgentT]
) –Agent instance the agent will implement or the agent type that will be initialized on the worker using
args
andkwargs
. -
args
(tuple[Any, ...] | None
, default:None
) –Positional arguments used to initialize the agent. Ignored if
agent
is already an instance. -
kwargs
(dict[str, Any] | None
, default:None
) –Keyword arguments used to initialize the agent. Ignored if
agent
is already an instance. -
config
(RuntimeConfig | None
, default:None
) –Agent run configuration.
-
executor
(str | None
, default:None
) –Name of the executor instance to use. If
None
, uses the default executor, if specified, otherwise raises an error. -
name
(str | None
, default:None
) –Readable name of the agent used when registering a new agent.
-
registration
(AgentRegistration[AgentT] | None
, default:None
) –If
None
, a new agent will be registered with the exchange.
Returns:
-
RemoteHandle[AgentT]
–Handle (client bound) used to interact with the agent.
Raises:
-
RuntimeError
–If
registration
is provided and an agent with that ID has already been executed. -
ValueError
–If no default executor is set and
executor
is not specified.
Source code in academy/manager.py
get_handle
¶
get_handle(
agent: AgentId[AgentT] | AgentRegistration[AgentT],
) -> RemoteHandle[AgentT]
Create a new handle to an agent.
A handle acts like a reference to a remote agent, enabling a user to manage the agent or asynchronously invoke actions.
Parameters:
-
agent
(AgentId[AgentT] | AgentRegistration[AgentT]
) –Agent ID or registration indicating the agent to create a handle to. The agent must be registered with the same exchange that this manager is a client of.
Returns:
-
RemoteHandle[AgentT]
–Handle to the agent.
Source code in academy/manager.py
register_agent
async
¶
register_agent(
agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]
Register a new agent with the exchange.
Parameters:
-
agent
(type[AgentT]
) –Agent type of the agent.
-
name
(str | None
, default:None
) –Optional display name for the agent.
Returns:
-
AgentRegistration[AgentT]
–Agent registration info that can be passed to
-
AgentRegistration[AgentT]
–
Source code in academy/manager.py
running
¶
Get a set of IDs of all running agents.
Returns:
-
set[AgentId[Any]]
–Set of agent IDs corresponding to all agents launched by this manager that have not completed yet.
Source code in academy/manager.py
shutdown
async
¶
shutdown(
agent: AgentId[Any] | RemoteHandle[Any],
*,
blocking: bool = True,
raise_error: bool = True,
terminate: bool | None = None,
timeout: float | None = None
) -> None
Shutdown a launched agent.
Parameters:
-
agent
(AgentId[Any] | RemoteHandle[Any]
) –ID or handle to the launched agent.
-
blocking
(bool
, default:True
) –Wait for the agent to exit before returning.
-
raise_error
(bool
, default:True
) –Raise the error returned by the agent if
blocking=True
. -
terminate
(bool | None
, default:None
) –Override the termination agent of the agent defined in the
RuntimeConfig
. -
timeout
(float | None
, default:None
) –Optional timeout is seconds when
blocking=True
.
Raises:
-
BadEntityIdError
–If an agent with
agent_id
was not launched by this manager. -
TimeoutError
–If
timeout
was exceeded while blocking for agent.
Source code in academy/manager.py
wait
async
¶
wait(
agents: Iterable[AgentId[Any] | RemoteHandle[Any]],
*,
raise_error: bool = False,
return_when: str = ALL_COMPLETED,
timeout: float | None = None
) -> None
Wait for launched agents to complete.
Note
Calling wait()
is only valid after launch()
has succeeded.
Parameters:
-
agents
(Iterable[AgentId[Any] | RemoteHandle[Any]]
) –An iterable of agent IDs or handles to wait on.
-
raise_error
(bool
, default:False
) –Raise errors returned by completed agents.
-
return_when
(str
, default:ALL_COMPLETED
) –Indicate when this function should return. The same as
asyncio.wait()
. -
timeout
(float | None
, default:None
) –Optional timeout in seconds to wait for agents.
Raises:
-
BadEntityIdError
–If an agent was not launched by this manager.
-
TimeoutError
–If
timeout
was exceeded while waiting for agents. -
Exception
–Any exception raised by an agent that completed due to a failure and
raise_error=True
is set.