Skip to content

academy.multiplex

MailboxMultiplexer

MailboxMultiplexer(
    mailbox_id: EntityId,
    exchange: Exchange,
    request_handler: Callable[[RequestMessage], None],
)

Bases: NoPickleMixin

Multiplex a single mailbox across many consumers.

A mailbox represents a recipient entity. In many cases, there may be many entities within a single process that want to send and receive messages. For example, a running agent may have multiple handles to other agents. A naive approach would be for the agent and each handle to have their own mailbox, but this requires a listening thread in the process for each mailbox. This does not scale well. The multiplexer lets multiple entities (e.g., an agent and its handles) share the a single mailbox so their is one listening thread and messages are dispatched to the appropriate entity (i.e., object) within the process.

Note

This class should not be considered as a part of the public API. It is used internally by other components, such as the Agent and Manager, which use multiple handles concurrently.

Parameters:

  • mailbox_id (EntityId) –

    EntityId of the mailbox to multiplex. For example, the identifier of an agent.

  • exchange (Exchange) –

    The exchange interface managing the mailbox.

  • request_handler (Callable[[RequestMessage], None]) –

    A callable invoked when the request message is received to the inbox.

Source code in academy/multiplex.py
def __init__(
    self,
    mailbox_id: EntityId,
    exchange: Exchange,
    request_handler: Callable[[RequestMessage], None],
) -> None:
    self.mailbox_id = mailbox_id
    self.exchange = exchange
    self.request_handler = request_handler
    self.bound_handles: dict[uuid.UUID, BoundRemoteHandle[Any]] = {}

bind

bind(
    handle: RemoteHandle[BehaviorT],
) -> BoundRemoteHandle[BehaviorT]

Bind a handle to this mailbox.

Parameters:

  • handle (RemoteHandle[BehaviorT]) –

    Remote handle to bind.

Returns:

Source code in academy/multiplex.py
def bind(
    self,
    handle: RemoteHandle[BehaviorT],
) -> BoundRemoteHandle[BehaviorT]:
    """Bind a handle to this mailbox.

    Args:
        handle: Remote handle to bind.

    Returns:
        Remote handle bound to this mailbox.
    """
    bound = handle.bind_to_mailbox(self.mailbox_id)
    self.bound_handles[bound.handle_id] = bound
    logger.debug(
        'Bound handle to %s to multiplexer (%s)',
        bound.agent_id,
        self,
    )
    return bound

close

close() -> None

Close the multiplexer.

Closes all handles bound to this mailbox and then closes the mailbox.

Source code in academy/multiplex.py
def close(self) -> None:
    """Close the multiplexer.

    Closes all handles bound to this mailbox and then closes the mailbox.
    """
    # This will cause listen() to return
    self.terminate()
    self.close_bound_handles()

close_bound_handles

close_bound_handles() -> None

Close all handles bound to this mailbox.

Source code in academy/multiplex.py
def close_bound_handles(self) -> None:
    """Close all handles bound to this mailbox."""
    for key in tuple(self.bound_handles):
        handle = self.bound_handles.pop(key)
        handle.close(wait_futures=False)
    logger.debug('Closed all handles bound to multiplexer (%s)', self)

terminate

terminate() -> None

Close the mailbox.

Source code in academy/multiplex.py
def terminate(self) -> None:
    """Close the mailbox."""
    self.exchange.terminate(self.mailbox_id)
    logger.debug('Closed mailbox of multiplexer (%s)', self)

listen

listen() -> None

Listen for new messages in the mailbox and process them.

Request messages are processed via the request_handler, and response messages are dispatched to the handle that created the corresponding request.

Warning

This method loops forever, until the mailbox is closed. Thus this method is typically run inside of a thread.

Note

Response messages intended for a handle that does not exist will be logged and discarded.

Source code in academy/multiplex.py
def listen(self) -> None:
    """Listen for new messages in the mailbox and process them.

    Request messages are processed via the `request_handler`, and response
    messages are dispatched to the handle that created the corresponding
    request.

    Warning:
        This method loops forever, until the mailbox is closed. Thus this
        method is typically run inside of a thread.

    Note:
        Response messages intended for a handle that does not exist
        will be logged and discarded.
    """
    logger.debug('Listening for messages in %s', self)
    mailbox = self.exchange.get_mailbox(self.mailbox_id)

    try:
        while True:
            message = mailbox.recv()
            self._message_handler(message)
    except MailboxClosedError:
        pass
    finally:
        mailbox.close()
        logger.debug('Finished listening for messages in %s', self)