Skip to content

academy.exchange.queue

QueueClosedError

Bases: Exception

Queue has been closed exception.

AsyncQueue

AsyncQueue()

Bases: Generic[T]

Simple async queue.

This is a simple backport of Python 3.13 queues which have a shutdown method and exception type.

Source code in academy/exchange/queue.py
def __init__(self) -> None:
    self._queue: asyncio.PriorityQueue[_Item[T]] = asyncio.PriorityQueue()
    self._closed = False

close async

close(immediate: bool = False) -> None

Close the queue.

This will cause get and put to raise QueueClosedError.

Parameters:

  • immediate (bool, default: False ) –

    Close the queue immediately, rather than once the queue is empty.

Source code in academy/exchange/queue.py
async def close(self, immediate: bool = False) -> None:
    """Close the queue.

    This will cause `get` and `put` to raise `QueueClosedError`.

    Args:
        immediate: Close the queue immediately, rather than once the
            queue is empty.
    """
    if not self.closed():
        self._closed = True
        priority = CLOSE_PRIORITY if immediate else DEFAULT_PRIORITY
        await self._queue.put(_Item(priority, CLOSE_SENTINEL))

closed

closed() -> bool

Check if the queue has been closed.

Source code in academy/exchange/queue.py
def closed(self) -> bool:
    """Check if the queue has been closed."""
    return self._closed

get async

get() -> T

Remove and return the next item from the queue (blocking).

Source code in academy/exchange/queue.py
async def get(self) -> T:
    """Remove and return the next item from the queue (blocking)."""
    item = await self._queue.get()
    if item.value is CLOSE_SENTINEL:
        raise QueueClosedError
    return cast(T, item.value)

put async

put(item: T) -> None

Put an item on the queue.

Source code in academy/exchange/queue.py
async def put(self, item: T) -> None:
    """Put an item on the queue."""
    if self.closed():
        raise QueueClosedError
    await self._queue.put(_Item(DEFAULT_PRIORITY, item))

Queue

Queue()

Bases: Generic[T]

Simple queue.

This is a simple backport of Python 3.13 queues which have a shutdown method and exception type.

Source code in academy/exchange/queue.py
def __init__(self) -> None:
    self._queue: queue.PriorityQueue[_Item[T]] = queue.PriorityQueue()
    self._closed = False

close

close(immediate: bool = False) -> None

Close the queue.

This will cause get and put to raise QueueClosedError.

Parameters:

  • immediate (bool, default: False ) –

    Close the queue immediately, rather than once the queue is empty.

Source code in academy/exchange/queue.py
def close(self, immediate: bool = False) -> None:
    """Close the queue.

    This will cause `get` and `put` to raise `QueueClosedError`.

    Args:
        immediate: Close the queue immediately, rather than once the
            queue is empty.
    """
    if not self.closed():
        self._closed = True
        priority = CLOSE_PRIORITY if immediate else DEFAULT_PRIORITY
        self._queue.put(_Item(priority, CLOSE_SENTINEL))

closed

closed() -> bool

Check if the queue has been closed.

Source code in academy/exchange/queue.py
def closed(self) -> bool:
    """Check if the queue has been closed."""
    return self._closed

get

get(timeout: float | None = None) -> T

Remove and return the next item from the queue (blocking).

Parameters:

  • timeout (float | None, default: None ) –

    Block at most timeout seconds.

Raises:

Source code in academy/exchange/queue.py
def get(self, timeout: float | None = None) -> T:
    """Remove and return the next item from the queue (blocking).

    Args:
        timeout: Block at most `timeout` seconds.

    Raises:
        TimeoutError: if no item was available within `timeout` seconds.
        QueueClosedError: if the queue was closed.
    """
    try:
        item = self._queue.get(timeout=timeout)
    except queue.Empty:
        raise TimeoutError from None
    if item.value is CLOSE_SENTINEL:
        # Push the sentinel back to the queue in case another thread
        # has called get.
        self._queue.put(_Item(CLOSE_PRIORITY, CLOSE_SENTINEL))
        raise QueueClosedError
    return cast(T, item.value)

put

put(item: T) -> None

Put an item on the queue.

Parameters:

  • item (T) –

    The item to put on the queue.

Raises:

Source code in academy/exchange/queue.py
def put(self, item: T) -> None:
    """Put an item on the queue.

    Args:
        item: The item to put on the queue.

    Raises:
        QueueClosedError: if the queue was closed.
    """
    if self.closed():
        raise QueueClosedError
    self._queue.put(_Item(DEFAULT_PRIORITY, item))