Skip to content

academy.agent

AgentT module-attribute

AgentT = TypeVar('AgentT', bound='Agent')

Type variable bound to Agent.

Agent

Agent()

Agent base class.

An agent is composed of three parts:

  1. The agent_on_startup() and agent_on_shutdown() methods define callbacks that are invoked once at the start and end of an agent's execution, respectively. The methods should be used to initialize and cleanup stateful resources. Resource initialization should not be performed in __init__.
  2. Action methods annotated with @action are methods that other agents can invoke on this agent. An agent may also call it's own action methods as normal methods.
  3. Control loop methods annotated with @loop are executed in separate threads when the agent is executed.

The Runtime is used to execute an agent definition.

Warning

This class cannot be instantiated directly and must be subclassed.

Source code in academy/agent.py
def __init__(self) -> None:
    self.__agent_context: AgentContext[Self] | None = None
    self.__agent_run_sync_semaphore: asyncio.Semaphore | None = None

agent_context property

agent_context: AgentContext[Self]

Agent runtime context.

Raises:

agent_id property

agent_id: AgentId[Self]

Agent Id.

Raises:

agent_exchange_client property

agent_exchange_client: AgentExchangeClient[Self, Any]

Agent exchange client.

Raises:

agent_on_startup async

agent_on_startup() -> None

Callback invoked at the end of an agent's startup sequence.

Control loops will not start and action requests will not be processed until after this callback completes. Thus, it is safe to initialize resources in this callback that are needed by actions or loops.

See Runtime.run_until_complete() for more details on the startup sequence.

Source code in academy/agent.py
async def agent_on_startup(self) -> None:
    """Callback invoked at the end of an agent's startup sequence.

    Control loops will not start and action requests will not be
    processed until after this callback completes. Thus, it is safe to
    initialize resources in this callback that are needed by actions or
    loops.

    See
    [`Runtime.run_until_complete()`][academy.runtime.Runtime.run_until_complete]
    for more details on the startup sequence.
    """
    pass

agent_on_shutdown async

agent_on_shutdown() -> None

Callback invoked at the beginning of an agent's shutdown sequence.

See Runtime.run_until_complete() for more details on the shutdown sequence.

Source code in academy/agent.py
async def agent_on_shutdown(self) -> None:
    """Callback invoked at the beginning of an agent's shutdown sequence.

    See
    [`Runtime.run_until_complete()`][academy.runtime.Runtime.run_until_complete]
    for more details on the shutdown sequence.
    """
    pass

agent_run_sync async

agent_run_sync(
    function: Callable[P, R],
    /,
    *args: args,
    **kwargs: kwargs,
) -> R

Run a blocking function in separate thread.

Example
import time
from academy.agent import Agent, action

class Example(Agent):
    def blocking_call(self, value: int) -> int:
        time.sleep(10)
        return value

    @action
    async def non_blocking_call(self, value: int) -> int:
        result = await self.agent_run_sync(self.blocking_call, value)
        ...
        return result
Note

The max concurrency of the executor is configured in the RuntimeConfig. If all executor workers are busy the function will be queued and a warning will be logged.

Warning

This function does not support cancellation. For example, if you wrap this call in asyncio.wait_for() and a timeout occurs, the task wrapping the coroutine will be cancelled but the blocking function will continue running in its thread until completion.

Parameters:

  • function (Callable[P, R]) –

    The blocking function to run.

  • *args (args, default: () ) –

    Positional arguments for the function.

  • **kwargs (kwargs, default: {} ) –

    Keyword arguments for the function.

Returns:

  • R

    The result of the function call.

Raises:

Source code in academy/agent.py
async def agent_run_sync(
    self,
    function: Callable[P, R],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> R:
    """Run a blocking function in separate thread.

    Example:
        ```python
        import time
        from academy.agent import Agent, action

        class Example(Agent):
            def blocking_call(self, value: int) -> int:
                time.sleep(10)
                return value

            @action
            async def non_blocking_call(self, value: int) -> int:
                result = await self.agent_run_sync(self.blocking_call, value)
                ...
                return result
        ```

    Note:
        The max concurrency of the executor is configured in the
        [`RuntimeConfig`][academy.runtime.RuntimeConfig]. If all
        executor workers are busy the function will be queued and a
        warning will be logged.

    Warning:
       This function does not support cancellation. For example, if you
       wrap this call in [`asyncio.wait_for()`][asyncio.wait_for] and a
       timeout occurs, the task wrapping the coroutine will be cancelled
       but the blocking function will continue running in its thread until
       completion.

    Args:
        function: The blocking function to run.
        *args: Positional arguments for the function.
        **kwargs: Keyword arguments for the function.

    Returns:
        The result of the function call.

    Raises:
        AgentNotInitializedError: If the agent runtime has not been
            started.
        Exception: Any exception raised by the function.
    """  # noqa: E501
    executor = self.agent_context.executor

    wrapped = functools.partial(function, *args, **kwargs)
    loop = asyncio.get_running_loop()

    if self.__agent_run_sync_semaphore is None:
        max_workers = executor._max_workers
        self.__agent_run_sync_semaphore = asyncio.Semaphore(max_workers)

    acquired = self.__agent_run_sync_semaphore.locked()
    if acquired:
        logger.warning(
            f'Thread-pool executor for {self.agent_id} is overloaded, '
            f'sync function "{function.__name__}" is waiting for a '
            'worker',
        )

    async with self.__agent_run_sync_semaphore:
        return await loop.run_in_executor(executor, wrapped)

agent_shutdown

agent_shutdown() -> None

Request the agent to shutdown.

Raises:

Source code in academy/agent.py
def agent_shutdown(self) -> None:
    """Request the agent to shutdown.

    Raises:
        AgentNotInitializedError: If the agent runtime implementing
            this agent has not been started.
    """
    self.agent_context.shutdown_event.set()

Action

Bases: Generic[P, R_co], Protocol

Action method protocol.

__call__ async

__call__(*arg: args, **kwargs: kwargs) -> R_co

Expected signature of methods decorated as an action.

In general, action methods can implement any signature.

Source code in academy/agent.py
async def __call__(self, *arg: P.args, **kwargs: P.kwargs) -> R_co:
    """Expected signature of methods decorated as an action.

    In general, action methods can implement any signature.
    """
    ...

ControlLoop

Bases: Protocol

Control loop method protocol.

__call__ async

__call__(shutdown: Event) -> None

Expected signature of methods decorated as a control loop.

Parameters:

  • shutdown (Event) –

    Event indicating that the agent has been instructed to shutdown and all control loops should exit.

Returns:

  • None

    Control loops should not return anything.

Source code in academy/agent.py
async def __call__(self, shutdown: asyncio.Event) -> None:
    """Expected signature of methods decorated as a control loop.

    Args:
        shutdown: Event indicating that the agent has been instructed to
            shutdown and all control loops should exit.

    Returns:
        Control loops should not return anything.
    """
    ...

action

action(
    method: ActionMethod[P, R],
) -> ActionMethod[P, R]
action(
    *,
    allow_protected_name: bool = False,
    context: bool = False
) -> Callable[[ActionMethod[P, R]], ActionMethod[P, R]]
action(
    method: ActionMethod[P, R] | None = None,
    *,
    allow_protected_name: bool = False,
    context: bool = False
) -> (
    ActionMethod[P, R]
    | Callable[[ActionMethod[P, R]], ActionMethod[P, R]]
)

Decorator that annotates a method of a agent as an action.

Marking a method of a agent as an action makes the method available to other agents. I.e., peers within a multi-agent system can only invoke methods marked as actions on each other. This enables agents to define "private" methods.

Example
from academy.agent import Agent, action
from academy.context import ActionContext

class Example(Agent):
    @action
    async def perform(self) -> ...:
        ...

    @action(context=True)
    async def perform_with_ctx(self, *, context: ActionContext) -> ...:
        ...
Warning

A warning will be emitted if the decorated method's name clashed with a method of Handle because it would not be possible to invoke this action remotely via attribute lookup on a handle. This warning can be suppressed with allow_protected_name=True, and the action must be invoked via Handle.action().

Parameters:

  • method (ActionMethod[P, R] | None, default: None ) –

    Method to decorate as an action.

  • allow_protected_name (bool, default: False ) –

    Allow decorating a method as an action when the name of the method clashes with a protected method name of Handle. This flag silences the emitted warning.

  • context (bool, default: False ) –

    Specify that the action method expects a context argument. The context will be provided at runtime as a keyword argument.

Raises:

  • TypeError

    If context=True and the method does not have a parameter named context or if context is a positional only argument.

Source code in academy/agent.py
def action(
    method: ActionMethod[P, R] | None = None,
    *,
    allow_protected_name: bool = False,
    context: bool = False,
) -> ActionMethod[P, R] | Callable[[ActionMethod[P, R]], ActionMethod[P, R]]:
    """Decorator that annotates a method of a agent as an action.

    Marking a method of a agent as an action makes the method available
    to other agents. I.e., peers within a multi-agent system can only invoke
    methods marked as actions on each other. This enables agents to
    define "private" methods.

    Example:
        ```python
        from academy.agent import Agent, action
        from academy.context import ActionContext

        class Example(Agent):
            @action
            async def perform(self) -> ...:
                ...

            @action(context=True)
            async def perform_with_ctx(self, *, context: ActionContext) -> ...:
                ...
        ```

    Warning:
        A warning will be emitted if the decorated method's name clashed
        with a method of [`Handle`][academy.handle.Handle] because it would
        not be possible to invoke this action remotely via attribute
        lookup on a handle. This warning can be suppressed with
        `allow_protected_name=True`, and the action must be invoked via
        [`Handle.action()`][academy.handle.Handle.action].

    Args:
        method: Method to decorate as an action.
        allow_protected_name: Allow decorating a method as an action when
            the name of the method clashes with a protected method name of
            [`Handle`][academy.handle.Handle]. This flag silences the
            emitted warning.
        context: Specify that the action method expects a context argument.
            The `context` will be provided at runtime as a keyword argument.

    Raises:
        TypeError: If `context=True` and the method does not have a parameter
            named `context` or if `context` is a positional only argument.
    """

    def decorator(method_: ActionMethod[P, R]) -> ActionMethod[P, R]:
        if (
            not allow_protected_name
            and method_.__name__ in _get_handle_protected_methods()
        ):
            warnings.warn(
                f'The name of the decorated method is "{method_.__name__}" '
                'which clashes with a protected method of Handle. '
                'Rename the decorated method to avoid ambiguity when remotely '
                'invoking it via a handle.',
                UserWarning,
                stacklevel=3,
            )
        # Typing the requirement that if context=True then params P should
        # contain a keyword argument named "context" is not easily annotated
        # for mypy so instead we check at runtime.
        if context:
            sig = inspect.signature(method_)
            if 'context' not in sig.parameters:
                raise TypeError(
                    f'Action method "{method_.__name__}" must accept a '
                    '"context" keyword argument when used with '
                    '@action(context=True).',
                )
            if (
                sig.parameters['context'].kind
                != inspect.Parameter.KEYWORD_ONLY
            ):
                raise TypeError(
                    'The "context" argument to action method '
                    f'"{method_.__name__}" must be a keyword only argument.',
                )

        method_._agent_method_type = 'action'  # type: ignore[attr-defined]
        method_._action_method_context = context  # type: ignore[attr-defined]
        return method_

    if method is None:
        return decorator
    else:
        return decorator(method)

loop

loop(method: LoopMethod[AgentT]) -> LoopMethod[AgentT]

Decorator that annotates a method of a agent as a control loop.

Control loop methods of a agent are run as threads when an agent starts. A control loop can run for a well-defined period of time or indefinitely, provided the control loop exits when the shutdown event, passed as a parameter to all control loop methods, is set.

Example
import asyncio
from academy.agent import Agent, loop

class Example(Agent):
    @loop
    async def listen(self, shutdown: asyncio.Event) -> None:
        while not shutdown.is_set():
            ...

Raises:

Source code in academy/agent.py
def loop(method: LoopMethod[AgentT]) -> LoopMethod[AgentT]:
    """Decorator that annotates a method of a agent as a control loop.

    Control loop methods of a agent are run as threads when an agent
    starts. A control loop can run for a well-defined period of time or
    indefinitely, provided the control loop exits when the `shutdown`
    event, passed as a parameter to all control loop methods, is set.

    Example:
        ```python
        import asyncio
        from academy.agent import Agent, loop

        class Example(Agent):
            @loop
            async def listen(self, shutdown: asyncio.Event) -> None:
                while not shutdown.is_set():
                    ...
        ```

    Raises:
        TypeError: if the method signature does not conform to the
            [`ControlLoop`][academy.agent.ControlLoop] protocol.
    """
    method._agent_method_type = 'loop'  # type: ignore[attr-defined]

    if sys.version_info >= (3, 10):  # pragma: >=3.10 cover
        found_sig = inspect.signature(method, eval_str=True)
        expected_sig = inspect.signature(ControlLoop.__call__, eval_str=True)
    else:  # pragma: <3.10 cover
        found_sig = inspect.signature(method)
        expected_sig = inspect.signature(ControlLoop.__call__)

    if found_sig != expected_sig:
        raise TypeError(
            f'Signature of loop method "{method.__name__}" is {found_sig} '
            f'but should be {expected_sig}. If the signatures look the same '
            'except that types are stringified, try importing '
            '"from __future__ import annotations" at the top of the module '
            'where the agent is defined.',
        )

    @functools.wraps(method)
    async def _wrapped(self: AgentT, shutdown: asyncio.Event) -> None:
        logger.debug('Started %r loop for %s', method.__name__, self)
        await method(self, shutdown)
        logger.debug('Exited %r loop for %s', method.__name__, self)

    return _wrapped

event

event(
    name: str,
) -> Callable[
    [Callable[[AgentT], Coroutine[None, None, None]]],
    LoopMethod[AgentT],
]

Decorator that annotates a method of a agent as an event loop.

An event loop is a special type of control loop that runs when a asyncio.Event is set. The event is cleared after the loop runs.

Example
import asyncio
from academy.agent import Agent, timer

class Example(Agent):
    def __init__(self) -> None:
        self.alert = asyncio.Event()

    @event('alert')
    async def handle(self) -> None:
        # Runs every time alter is set
        ...

Parameters:

Raises:

Source code in academy/agent.py
def event(
    name: str,
) -> Callable[
    [Callable[[AgentT], Coroutine[None, None, None]]],
    LoopMethod[AgentT],
]:
    """Decorator that annotates a method of a agent as an event loop.

    An event loop is a special type of control loop that runs when a
    [`asyncio.Event`][asyncio.Event] is set. The event is cleared
    after the loop runs.

    Example:
        ```python
        import asyncio
        from academy.agent import Agent, timer

        class Example(Agent):
            def __init__(self) -> None:
                self.alert = asyncio.Event()

            @event('alert')
            async def handle(self) -> None:
                # Runs every time alter is set
                ...
        ```

    Args:
        name: Attribute name of the [`asyncio.Event`][asyncio.Event]
            to wait on.

    Raises:
        AttributeError: Raised at runtime if no attribute named `name`
            exists on the agent.
        TypeError: Raised at runtime if the attribute named `name` is not
            a [`asyncio.Event`][asyncio.Event].
    """

    def decorator(
        method: Callable[[AgentT], Coroutine[None, None, None]],
    ) -> LoopMethod[AgentT]:
        method._agent_method_type = 'loop'  # type: ignore[attr-defined]

        @functools.wraps(method)
        async def _wrapped(self: AgentT, shutdown: asyncio.Event) -> None:
            event = getattr(self, name)
            if not isinstance(event, asyncio.Event):
                raise TypeError(
                    f'Attribute {name} of {type(self).__class__} has type '
                    f'{type(event).__class__}. Expected asyncio.Event.',
                )

            logger.debug(
                'Started %r event loop for %s (event: %r)',
                method.__name__,
                self,
                name,
            )
            while not shutdown.is_set():
                await wait_event_async(shutdown, event)
                if event.is_set():
                    try:
                        await method(self)
                    finally:
                        event.clear()
            logger.debug('Exited %r event loop for %s', method.__name__, self)

        return _wrapped

    return decorator

timer

timer(
    interval: float | timedelta,
) -> Callable[
    [Callable[[AgentT], Coroutine[None, None, None]]],
    LoopMethod[AgentT],
]

Decorator that annotates a method of a agent as a timer loop.

A timer loop is a special type of control loop that runs at a set interval. The method will always be called once before the first sleep.

Example
from academy.agent import Agent, timer

class Example(Agent):
    @timer(interval=1)
    async def listen(self) -> None:
        # Runs every 1 second
        ...

Parameters:

Source code in academy/agent.py
def timer(
    interval: float | timedelta,
) -> Callable[
    [Callable[[AgentT], Coroutine[None, None, None]]],
    LoopMethod[AgentT],
]:
    """Decorator that annotates a method of a agent as a timer loop.

    A timer loop is a special type of control loop that runs at a set
    interval. The method will always be called once before the first
    sleep.

    Example:
        ```python
        from academy.agent import Agent, timer

        class Example(Agent):
            @timer(interval=1)
            async def listen(self) -> None:
                # Runs every 1 second
                ...
        ```

    Args:
        interval: Seconds or a [`timedelta`][datetime.timedelta] to wait
            between invoking the method.
    """
    interval = (
        interval.total_seconds()
        if isinstance(interval, timedelta)
        else interval
    )

    def decorator(
        method: Callable[[AgentT], Coroutine[None, None, None]],
    ) -> LoopMethod[AgentT]:
        method._agent_method_type = 'loop'  # type: ignore[attr-defined]

        @functools.wraps(method)
        async def _wrapped(self: AgentT, shutdown: asyncio.Event) -> None:
            logger.debug(
                'Started %r timer loop for %s (interval: %fs)',
                method.__name__,
                self,
                interval,
            )
            while not shutdown.is_set():
                try:
                    await asyncio.wait_for(shutdown.wait(), timeout=interval)
                except asyncio.TimeoutError:
                    await method(self)
            logger.debug('Exited %r timer loop for %s', method.__name__, self)

        return _wrapped

    return decorator