Skip to content

academy.socket

SocketClosedError

Bases: Exception

Socket is already closed.

SocketOpenError

Bases: Exception

Failed to open socket.

SimpleSocket

SimpleSocket(
    host: str, port: int, *, timeout: float | None = None
)

Simple socket wrapper.

Configures a client connection using a blocking TCP socket over IPv4. The send and recv methods handle byte encoding, message delimiters, and partial message buffering.

Note

This class can be used as a context manager.

Parameters:

  • host (str) –

    Host address to connect to.

  • port (int) –

    Port to connect to.

  • timeout (float | None, default: None ) –

    Connection establish timeout.

Raises:

  • SocketOpenError

    if creating the socket fails. The __cause__ of the exception will be set to the underlying OSError.

Source code in academy/socket.py
def __init__(
    self,
    host: str,
    port: int,
    *,
    timeout: float | None = None,
) -> None:
    self.host = host
    self.port = port
    self.timeout = timeout
    self.closed = False
    try:
        self.socket = socket.create_connection(
            (self.host, self.port),
            timeout=self.timeout,
        )
    except OSError as e:
        raise SocketOpenError() from e

close

close(shutdown: bool = True) -> None

Close the socket.

Source code in academy/socket.py
def close(self, shutdown: bool = True) -> None:
    """Close the socket."""
    if self.closed:
        return
    if shutdown:  # pragma: no branch
        with contextlib.suppress(OSError):
            # Some platforms may raise ENOTCONN here
            self.socket.shutdown(socket.SHUT_RDWR)
    self.socket.close()
    self.closed = True

send

send(message: bytes) -> None

Send bytes to the socket.

Note

This is a noop if the message is empty.

Parameters:

  • message (bytes) –

    Message to send.

Raises:

Source code in academy/socket.py
def send(self, message: bytes) -> None:
    """Send bytes to the socket.

    Note:
        This is a noop if the message is empty.

    Args:
        message: Message to send.

    Raises:
        SocketClosedError: if the socket was closed.
        OSError: if an error occurred.
    """
    message_size = len(message)
    if message_size == 0:
        return
    header = _make_header(message)
    self._send_with_error_wrapping(header)

    sent_size = 0
    while sent_size < message_size:
        nbytes = min(message_size - sent_size, MESSAGE_CHUNK_SIZE)
        chunk = message[sent_size : sent_size + nbytes]
        self._send_with_error_wrapping(chunk)
        sent_size += len(chunk)

send_string

send_string(message: str) -> None

Send a string to the socket.

Strings are encoded with UTF-8.

Parameters:

  • message (str) –

    Message to send.

Raises:

Source code in academy/socket.py
def send_string(self, message: str) -> None:
    """Send a string to the socket.

    Strings are encoded with UTF-8.

    Args:
        message: Message to send.

    Raises:
        SocketClosedError: if the socket was closed.
        OSError: if an error occurred.
    """
    self.send(message.encode('utf-8'))

recv

recv() -> bytes | bytearray

Receive the next message from the socket.

Returns:

Raises:

Source code in academy/socket.py
def recv(self) -> bytes | bytearray:
    """Receive the next message from the socket.

    Returns:
        Bytes containing the message.

    Raises:
        SocketClosedError: if the socket was closed.
        OSError: if an error occurred.
    """
    header = _recv_from_socket(self.socket, MESSAGE_HEADER_SIZE)
    message_size = _get_size_from_header(header)

    buffer = bytearray(message_size)
    received = 0
    while received < message_size:
        nbytes = min(message_size - received, MESSAGE_CHUNK_SIZE)
        chunk = _recv_from_socket(self.socket, nbytes)
        # buffer.extend(chunk)
        buffer[received : received + len(chunk)] = chunk
        received += len(chunk)

    return buffer

recv_string

recv_string() -> str

Receive the next message from the socket.

Returns:

  • str

    Message decoded as a UTF-8 string.

Raises:

Source code in academy/socket.py
def recv_string(self) -> str:
    """Receive the next message from the socket.

    Returns:
        Message decoded as a UTF-8 string.

    Raises:
        SocketClosedError: if the socket was closed.
        OSError: if an error occurred.
    """
    return self.recv().decode('utf-8')

SimpleSocketServer

SimpleSocketServer(
    handler: Callable[[bytes], bytes | None],
    *,
    host: str = "0.0.0.0",
    port: int | None = None,
    timeout: float | None = 5
)

Simple asyncio TCP socket server.

Parameters:

  • handler (Callable[[bytes], bytes | None]) –

    Callback that handles a message and returns the response string. The handler is called synchronously within the client handler so it should not perform any heavy/blocking operations.

  • host (str, default: '0.0.0.0' ) –

    Host to bind to.

  • port (int | None, default: None ) –

    Port to bind to. If None, a random port is bound to.

  • timeout (float | None, default: 5 ) –

    Seconds to wait for the server to startup and shutdown.

Source code in academy/socket.py
def __init__(
    self,
    handler: Callable[[bytes], bytes | None],
    *,
    host: str = '0.0.0.0',
    port: int | None = None,
    timeout: float | None = 5,
) -> None:
    self.host = host
    self.port = port if port is not None else open_port()
    self.handler = handler
    self.timeout = timeout
    self._started = threading.Event()
    self._signal_stop: asyncio.Future[None] | None = None
    self._loop: asyncio.AbstractEventLoop | None = None
    self._thread: threading.Thread | None = None
    self._lock = threading.Lock()
    self._client_tasks: set[asyncio.Task[None]] = set()

serve_forever async

serve_forever(stop: Future[None]) -> None

Accept and handles connections forever.

Parameters:

  • stop (Future[None]) –

    An asyncio future that this method blocks on. Can be used to signal externally that the coroutine should exit.

Source code in academy/socket.py
async def serve_forever(self, stop: asyncio.Future[None]) -> None:
    """Accept and handles connections forever.

    Args:
        stop: An asyncio future that this method blocks on. Can be used
            to signal externally that the coroutine should exit.
    """
    self._signal_stop = stop
    server = await asyncio.start_server(
        self._register_client_task,
        host=self.host,
        port=self.port,
    )
    logger.debug('TCP server listening at %s:%s', self.host, self.port)
    self._started.set()

    async with server:
        await server.start_serving()
        await self._signal_stop

        for task in tuple(self._client_tasks):
            task.cancel('Server has been closed.')
            with contextlib.suppress(asyncio.CancelledError):
                await task

    if sys.version_info >= (3, 13):  # pragma: >=3.13 cover
        server.close_clients()
    self._started.clear()
    logger.debug('TCP server finished at %s:%s', self.host, self.port)

start_server_thread

start_server_thread() -> None

Start the server in a new thread.

Source code in academy/socket.py
def start_server_thread(self) -> None:
    """Start the server in a new thread."""
    with self._lock:
        loop = asyncio.new_event_loop()
        stop = loop.create_future()

        def _target() -> None:
            asyncio.set_event_loop(loop)
            loop.run_until_complete(self.serve_forever(stop))
            loop.close()

        self._loop = loop
        self._thread = threading.Thread(
            target=_target,
            name='socket-server',
        )
        self._thread.start()
        self._started.wait(self.timeout)

stop_server_thread

stop_server_thread() -> None

Stop the server thread.

Source code in academy/socket.py
def stop_server_thread(self) -> None:
    """Stop the server thread."""
    with self._lock:
        if self._loop is None or self._thread is None:
            return
        assert self._signal_stop is not None
        self._loop.call_soon_threadsafe(self._signal_stop.set_result, None)
        self._thread.join(timeout=self.timeout)
        if self._thread.is_alive():  # pragma: no cover
            raise TimeoutError(
                'Server thread did not gracefully exit '
                f'within {self.timeout}s.',
            )
        self._loop = None
        self._thread = None

address_by_hostname

address_by_hostname() -> str

Get the IP address from the hostname of the local host.

Source code in academy/socket.py
def address_by_hostname() -> str:
    """Get the IP address from the hostname of the local host."""
    return socket.gethostbyname(platform.node())

address_by_interface

address_by_interface(ifname: str) -> str

Get the IP address of the given interface.

Source: https://stackoverflow.com/questions/24196932/how-can-i-get-the-ip-address-of-eth0-in-python#24196955

Parameters:

  • ifname (str) –

    Name of the interface whose address is to be returned.

Source code in academy/socket.py
def address_by_interface(ifname: str) -> str:  # pragma: darwin no cover
    """Get the IP address of the given interface.

    Source: https://stackoverflow.com/questions/24196932/how-can-i-get-the-ip-address-of-eth0-in-python#24196955

    Args:
        ifname: Name of the interface whose address is to be returned.
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    return socket.inet_ntoa(
        fcntl.ioctl(
            s.fileno(),
            0x8915,  # SIOCGIFADDR
            struct.pack('256s', bytes(ifname[:15], 'utf-8')),
        )[20:24],
    )

open_port

open_port() -> int

Return open port.

Source: https://stackoverflow.com/questions/2838244

Source code in academy/socket.py
def open_port() -> int:
    """Return open port.

    Source: https://stackoverflow.com/questions/2838244
    """
    while True:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind(('', 0))
        s.listen(1)
        port = s.getsockname()[1]
        s.close()
        if port not in _used_ports:  # pragma: no branch
            _used_ports.add(port)
            return port

wait_connection

wait_connection(
    host: str,
    port: int,
    *,
    sleep: float = 0.01,
    timeout: float | None = None
) -> None

Wait for a socket connection to be established.

Repeatedly tries to open and close a socket connection to host:port. If successful, the function returns. If unsuccessful before the timeout, a TimeoutError is raised. The function will sleep for sleep seconds in between successive connection attempts.

Parameters:

  • host (str) –

    Host address to connect to.

  • port (int) –

    Host port to connect to.

  • sleep (float, default: 0.01 ) –

    Seconds to sleep after unsuccessful connections.

  • timeout (float | None, default: None ) –

    Maximum number of seconds to wait for successful connections.

Source code in academy/socket.py
def wait_connection(
    host: str,
    port: int,
    *,
    sleep: float = 0.01,
    timeout: float | None = None,
) -> None:
    """Wait for a socket connection to be established.

    Repeatedly tries to open and close a socket connection to `host:port`.
    If successful, the function returns. If unsuccessful before the timeout,
    a `TimeoutError` is raised. The function will sleep for `sleep` seconds
    in between successive connection attempts.

    Args:
        host: Host address to connect to.
        port: Host port to connect to.
        sleep: Seconds to sleep after unsuccessful connections.
        timeout: Maximum number of seconds to wait for successful connections.
    """
    sleep = min(sleep, timeout) if timeout is not None else sleep
    waited = 0.0

    while True:
        try:
            start = time.perf_counter()
            with socket.create_connection((host, port), timeout=timeout):
                break
        except OSError as e:
            connection_time = time.perf_counter() - start
            waited += connection_time
            if timeout is not None and waited >= timeout:
                raise TimeoutError from e
            time.sleep(sleep)
            waited += sleep