Skip to content

academy.agent

AgentRunConfig dataclass

AgentRunConfig(
    close_exchange_on_exit: bool = True,
    max_action_concurrency: int | None = None,
    terminate_on_error: bool = True,
    terminate_on_exit: bool = True,
)

Agent run configuration.

Attributes:

  • close_exchange_on_exit (bool) –

    Close the exchange interface when the agent exits. Typically this should be True to clean up resources, except when multiple agents are running in the same process and sharing an exchange.

  • max_action_concurrency (int | None) –

    Maximum size of the thread pool used to concurrently execute action requests.

  • terminate_on_error (bool) –

    Terminate the agent by closing its mailbox permanently if the agent fails.

  • terminate_on_exit (bool) –

    Terminate the agent by closing its mailbox permanently after the agent exits.

Agent

Agent(
    behavior: BehaviorT,
    *,
    agent_id: AgentId[BehaviorT],
    exchange: Exchange,
    config: AgentRunConfig | None = None
)

Bases: Generic[BehaviorT]

Executable agent.

An agent executes predefined Behavior. An agent can operate independently or as part of a broader multi-agent system.

Note

An agent can only be run once. After shutdown() is called, later operations will raise a RuntimeError.

Note

If any @loop method raises an error, the agent will be signaled to shutdown.

Parameters:

  • behavior (BehaviorT) –

    Behavior that the agent will exhibit.

  • agent_id (AgentId[BehaviorT]) –

    EntityId of this agent in a multi-agent system.

  • exchange (Exchange) –

    Message exchange of multi-agent system. The agent will close the exchange when it finished running.

  • config (AgentRunConfig | None, default: None ) –

    Agent execution parameters.

Source code in academy/agent.py
def __init__(
    self,
    behavior: BehaviorT,
    *,
    agent_id: AgentId[BehaviorT],
    exchange: Exchange,
    config: AgentRunConfig | None = None,
) -> None:
    self.agent_id = agent_id
    self.behavior = behavior
    self.exchange = exchange
    self.config = config if config is not None else AgentRunConfig()

    self._actions = behavior.behavior_actions()
    self._loops = behavior.behavior_loops()

    self._shutdown = threading.Event()
    self._expected_shutdown = True
    self._state_lock = threading.Lock()
    self._state = _AgentState.INITIALIZED

    self._action_pool: ThreadPoolExecutor | None = None
    self._action_futures: dict[ActionRequest, Future[None]] = {}
    self._loop_pool: ThreadPoolExecutor | None = None
    self._loop_futures: dict[Future[None], str] = {}

    self._multiplexer = MailboxMultiplexer(
        self.agent_id,
        self.exchange,
        request_handler=self._request_handler,
    )

__call__

__call__() -> None

Alias for run().

Source code in academy/agent.py
def __call__(self) -> None:
    """Alias for [run()][academy.agent.Agent.run]."""
    self.run()

action

action(action: str, args: Any, kwargs: Any) -> Any

Invoke an action of the agent.

Parameters:

  • action (str) –

    Name of action to invoke.

  • args (Any) –

    Tuple of positional arguments.

  • kwargs (Any) –

    Dictionary of keyword arguments.

Returns:

  • Any

    Result of the action.

Raises:

  • AttributeError

    if an action with this name is not implemented by the behavior of the agent.

Source code in academy/agent.py
def action(self, action: str, args: Any, kwargs: Any) -> Any:
    """Invoke an action of the agent.

    Args:
        action: Name of action to invoke.
        args: Tuple of positional arguments.
        kwargs: Dictionary of keyword arguments.

    Returns:
        Result of the action.

    Raises:
        AttributeError: if an action with this name is not implemented by
            the behavior of the agent.
    """
    logger.debug('Invoking "%s" action on %s', action, self.agent_id)
    if action not in self._actions:
        raise AttributeError(
            f'Agent[{type(self.behavior).__name__}] does not have an '
            f'action named "{action}".',
        )
    return self._actions[action](*args, **kwargs)

run

run() -> None

Run the agent.

Starts the agent, waits for another thread to call signal_shutdown(), and then shuts down the agent.

Raises:

  • Exception

    Any exceptions raised inside threads.

Source code in academy/agent.py
def run(self) -> None:
    """Run the agent.

    Starts the agent, waits for another thread to call `signal_shutdown()`,
    and then shuts down the agent.

    Raises:
        Exception: Any exceptions raised inside threads.
    """
    try:
        self.start()
        self._shutdown.wait()
    finally:
        self.shutdown()

start

start() -> None

Start the agent.

Note

This method is idempotent; it will return if the agent is already running. However, it will raise an error if the agent is shutdown.

  1. Binds all unbound handles to remote agents to this agent.
  2. Calls Behavior.on_setup().
  3. Starts threads for all control loops defined on the agent's Behavior.
  4. Starts a thread for listening to messages from the Exchange (if provided).

Raises:

Source code in academy/agent.py
def start(self) -> None:
    """Start the agent.

    Note:
        This method is idempotent; it will return if the agent is
        already running. However, it will raise an error if the agent
        is shutdown.

    1. Binds all unbound handles to remote agents to this agent.
    1. Calls [`Behavior.on_setup()`][academy.behavior.Behavior.on_setup].
    1. Starts threads for all control loops defined on the agent's
       [`Behavior`][academy.behavior.Behavior].
    1. Starts a thread for listening to messages from the
       [`Exchange`][academy.exchange.Exchange] (if provided).

    Raises:
        RuntimeError: If the agent has been shutdown.
    """
    with self._state_lock:
        if self._state is _AgentState.SHUTDOWN:
            raise RuntimeError('Agent has already been shutdown.')
        elif self._state is _AgentState.RUNNING:
            return

        logger.debug(
            'Starting agent... (%s; %s)',
            self.agent_id,
            self.behavior,
        )
        self._state = _AgentState.STARTING
        self._bind_handles()
        self.behavior.on_setup()
        self._action_pool = ThreadPoolExecutor(
            self.config.max_action_concurrency,
        )
        self._loop_pool = ThreadPoolExecutor(
            max_workers=len(self._loops) + 1,
        )

        for name, method in self._loops.items():
            loop_future = self._loop_pool.submit(method, self._shutdown)
            self._loop_futures[loop_future] = name
            loop_future.add_done_callback(self._loop_callback)

        listener_future = self._loop_pool.submit(self._multiplexer.listen)
        self._loop_futures[listener_future] = '_multiplexer.listen'

        self._state = _AgentState.RUNNING

        logger.info('Running agent (%s; %s)', self.agent_id, self.behavior)

shutdown

shutdown() -> None

Shutdown the agent.

Note

This method is idempotent.

  1. Sets the shutdown Event passed to all control loops.
  2. Waits for any currently executing actions to complete.
  3. Closes the agent's mailbox indicating that no further messages will be processed.
  4. Waits for the control loop and message listener threads to exit.
  5. Optionally closes the exchange.
  6. Calls Behavior.on_shutdown().

Raises:

  • Exception

    Any exceptions raised inside threads.

Source code in academy/agent.py
def shutdown(self) -> None:
    """Shutdown the agent.

    Note:
        This method is idempotent.

    1. Sets the shutdown [`Event`][threading.Event] passed to all control
       loops.
    1. Waits for any currently executing actions to complete.
    1. Closes the agent's mailbox indicating that no further messages
       will be processed.
    1. Waits for the control loop and message listener threads to exit.
    1. Optionally closes the exchange.
    1. Calls
       [`Behavior.on_shutdown()`][academy.behavior.Behavior.on_shutdown].

    Raises:
        Exception: Any exceptions raised inside threads.
    """
    with self._state_lock:
        if self._state is _AgentState.SHUTDOWN:
            return

        logger.debug(
            'Shutting down agent... (expected: %s; %s; %s)',
            self._expected_shutdown,
            self.agent_id,
            self.behavior,
        )
        self._state = _AgentState.TERMINTATING
        self._shutdown.set()

        # Cause the multiplexer message listener thread to exit by closing
        # the mailbox the multiplexer is listening to. This is done
        # first so we stop receiving new requests.
        self._multiplexer.terminate()
        for future, name in self._loop_futures.items():
            if name == '_multiplexer.listen':
                future.result()

        # Wait for currently running actions to complete. No more
        # should come in now that multiplexer's listener thread is done.
        if self._action_pool is not None:
            self._action_pool.shutdown(wait=True, cancel_futures=True)

        # Shutdown the loop pool after waiting on the loops to exit.
        if self._loop_pool is not None:
            self._loop_pool.shutdown(wait=True)

        if (
            self._expected_shutdown and not self.config.terminate_on_exit
        ) or (
            not self._expected_shutdown
            and not self.config.terminate_on_error
        ):
            # TODO: This is a hack because we need to close the mailbox
            # for the multiplexer listener thread to exit, but in some
            # cases we don't actually want to close it permanently. This
            # means there is a race where the mailbox is temporarily
            # closed.
            self.exchange.register_agent(
                type(self.behavior),
                agent_id=self.agent_id,
            )

        self.behavior.on_shutdown()

        # Close the exchange last since the actions that finished
        # up may still need to use it to send replies.
        if self.config.close_exchange_on_exit:
            self.exchange.close()

        self._state = _AgentState.SHUTDOWN

        # Raise any exceptions from the loop threads as the final step.
        _raise_future_exceptions(tuple(self._loop_futures))

        logger.info(
            'Shutdown agent (%s; %s)',
            self.agent_id,
            self.behavior,
        )

signal_shutdown

signal_shutdown(expected: bool = True) -> None

Signal that the agent should exit.

If the agent has not started, this will cause the agent to immediately shutdown when next started. If the agent is shutdown, this has no effect.

Source code in academy/agent.py
def signal_shutdown(self, expected: bool = True) -> None:
    """Signal that the agent should exit.

    If the agent has not started, this will cause the agent to immediately
    shutdown when next started. If the agent is shutdown, this has no
    effect.
    """
    self._expected_shutdown = expected
    self._shutdown.set()