From b60728db702f419e4d94d66d441a9cd03fc09f28 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Tue, 14 Jan 2025 22:15:09 -0600 Subject: [PATCH 1/9] Implement highlevel unix socket listeners --- src/trio/__init__.py | 5 + src/trio/_highlevel_open_unix_listeners.py | 173 +++++++++++++++++++++ src/trio/_highlevel_socket.py | 78 +++++++++- 3 files changed, 255 insertions(+), 1 deletion(-) create mode 100644 src/trio/_highlevel_open_unix_listeners.py diff --git a/src/trio/__init__.py b/src/trio/__init__.py index 34fda8452..8f3d25350 100644 --- a/src/trio/__init__.py +++ b/src/trio/__init__.py @@ -63,11 +63,16 @@ serve_tcp as serve_tcp, ) from ._highlevel_open_tcp_stream import open_tcp_stream as open_tcp_stream +from ._highlevel_open_unix_listeners import ( + open_unix_listener as open_unix_listener, + serve_unix as serve_unix, +) from ._highlevel_open_unix_stream import open_unix_socket as open_unix_socket from ._highlevel_serve_listeners import serve_listeners as serve_listeners from ._highlevel_socket import ( SocketListener as SocketListener, SocketStream as SocketStream, + UnixSocketListener as UnixSocketListener, ) from ._highlevel_ssl_helpers import ( open_ssl_over_tcp_listeners as open_ssl_over_tcp_listeners, diff --git a/src/trio/_highlevel_open_unix_listeners.py b/src/trio/_highlevel_open_unix_listeners.py new file mode 100644 index 000000000..86fbddbdc --- /dev/null +++ b/src/trio/_highlevel_open_unix_listeners.py @@ -0,0 +1,173 @@ +from __future__ import annotations + +import os +import sys +from typing import TYPE_CHECKING + +import trio +import trio.socket as tsocket +from trio import TaskStatus + +if TYPE_CHECKING: + from collections.abc import Awaitable, Callable + + +try: + from trio.socket import AF_UNIX + + HAS_UNIX = True +except ImportError: + HAS_UNIX = False + + +# Default backlog size: +# +# Having the backlog too low can cause practical problems (a perfectly healthy +# service that starts failing to accept connections if they arrive in a +# burst). +# +# Having it too high doesn't really cause any problems. Like any buffer, you +# want backlog queue to be zero usually, and it won't save you if you're +# getting connection attempts faster than you can call accept() on an ongoing +# basis. But unlike other buffers, this one doesn't really provide any +# backpressure. If a connection gets stuck waiting in the backlog queue, then +# from the peer's point of view the connection succeeded but then their +# send/recv will stall until we get to it, possibly for a long time. OTOH if +# there isn't room in the backlog queue, then their connect stalls, possibly +# for a long time, which is pretty much the same thing. +# +# A large backlog can also use a bit more kernel memory, but this seems fairly +# negligible these days. +# +# So this suggests we should make the backlog as large as possible. This also +# matches what Golang does. However, they do it in a weird way, where they +# have a bunch of code to sniff out the configured upper limit for backlog on +# different operating systems. But on every system, passing in a too-large +# backlog just causes it to be silently truncated to the configured maximum, +# so this is unnecessary -- we can just pass in "infinity" and get the maximum +# that way. (Verified on Windows, Linux, macOS using +# https://github.com/python-trio/trio/wiki/notes-to-self#measure-listen-backlogpy +def _compute_backlog(backlog: int | None) -> int: + # Many systems (Linux, BSDs, ...) store the backlog in a uint16 and are + # missing overflow protection, so we apply our own overflow protection. + # https://github.com/golang/go/issues/5030 + if not isinstance(backlog, int) and backlog is not None: + raise TypeError(f"backlog must be an int or None, not {backlog!r}") + if backlog is None: + return 0xFFFF + return min(backlog, 0xFFFF) + + +async def open_unix_listener( + path: str | bytes | os.PathLike[str] | os.PathLike[bytes], + *, + mode: int | None = None, # 0o666, + backlog: int | None = None, +) -> trio.UnixSocketListener: + """Create :class:`SocketListener` objects to listen for connections. + Opens a connection to the specified + `Unix domain socket `__. + + You must have read/write permission on the specified file to connect. + + Args: + + path (str): Filename of UNIX socket to create and listen on. + Absolute or relative paths may be used. + + mode (int or None): The socket file permissions. + UNIX permissions are usually specified in octal numbers. + If you leave this as ``None``, Trio will not change the mode from + the operating system's default. + + backlog (int or None): The listen backlog to use. If you leave this as + ``None`` then Trio will pick a good default. (Currently: whatever + your system has configured as the maximum backlog.) + + Returns: + :class:`UnixSocketListener` + + Raises: + :class:`TypeError` if invalid arguments. + :class:`RuntimeError`: If AF_UNIX sockets are not supported. + """ + if not HAS_UNIX: + raise RuntimeError("Unix sockets are not supported on this platform") + + computed_backlog = _compute_backlog(backlog) + + fspath = await trio.Path(os.fsdecode(path)).absolute() + + folder = fspath.parent + if not await folder.exists(): + raise FileNotFoundError(f"Socket folder does not exist: {folder!r}") + + # much more simplified logic vs tcp sockets - one socket type and only one + # possible location to connect to + sock = tsocket.socket(AF_UNIX, tsocket.SOCK_STREAM) + try: + # See https://github.com/python-trio/trio/issues/39 + if sys.platform != "win32": + sock.setsockopt(tsocket.SOL_SOCKET, tsocket.SO_REUSEADDR, 1) + + await sock.bind(str(fspath)) + + sock.listen(computed_backlog) + + if mode is not None: + await fspath.chmod(mode) + + return trio.UnixSocketListener(sock) + except BaseException as exc: + sock.close() + try: + os.unlink(str(fspath)) + except BaseException as exc_2: + raise exc_2 from exc + raise + + +async def serve_unix( + handler: Callable[[trio.SocketStream], Awaitable[object]], + path: str | bytes | os.PathLike[str] | os.PathLike[bytes], + *, + backlog: int | None = None, + handler_nursery: trio.Nursery | None = None, + task_status: TaskStatus[list[trio.UnixSocketListener]] = trio.TASK_STATUS_IGNORED, +) -> None: + """Listen for incoming UNIX connections, and for each one start a task + running ``handler(stream)``. + This is a thin convenience wrapper around :func:`open_unix_listener` and + :func:`serve_listeners` – see them for full details. + .. warning:: + If ``handler`` raises an exception, then this function doesn't do + anything special to catch it – so by default the exception will + propagate out and crash your server. If you don't want this, then catch + exceptions inside your ``handler``, or use a ``handler_nursery`` object + that responds to exceptions in some other way. + When used with ``nursery.start`` you get back the newly opened listeners. + Args: + handler: The handler to start for each incoming connection. Passed to + :func:`serve_listeners`. + path: The socket file name. + Passed to :func:`open_unix_listener`. + backlog: The listen backlog, or None to have a good default picked. + Passed to :func:`open_tcp_listener`. + handler_nursery: The nursery to start handlers in, or None to use an + internal nursery. Passed to :func:`serve_listeners`. + task_status: This function can be used with ``nursery.start``. + Returns: + This function only returns when cancelled. + Raises: + RuntimeError: If AF_UNIX sockets are not supported. + """ + if not HAS_UNIX: + raise RuntimeError("Unix sockets are not supported on this platform") + + listener = await open_unix_listener(path, backlog=backlog) + await trio.serve_listeners( + handler, + [listener], + handler_nursery=handler_nursery, + task_status=task_status, + ) diff --git a/src/trio/_highlevel_socket.py b/src/trio/_highlevel_socket.py index c04e66e1b..38b4cafd7 100644 --- a/src/trio/_highlevel_socket.py +++ b/src/trio/_highlevel_socket.py @@ -3,7 +3,8 @@ import errno from contextlib import contextmanager, suppress -from typing import TYPE_CHECKING, overload +from os import unlink +from typing import TYPE_CHECKING, Final, overload import trio @@ -31,6 +32,8 @@ errno.ENOTSOCK, } +HAS_UNIX: Final = hasattr(tsocket, "AF_UNIX") + @contextmanager def _translate_socket_errors_to_stream_errors() -> Generator[None, None, None]: @@ -412,3 +415,76 @@ async def aclose(self) -> None: """Close this listener and its underlying socket.""" self.socket.close() await trio.lowlevel.checkpoint() + + +@final +class UnixSocketListener(Listener[SocketStream]): + """A :class:`~trio.abc.Listener` that uses a listening socket to accept + incoming connections as :class:`SocketStream` objects. + + Args: + socket: The Trio socket object to wrap. Must have type ``SOCK_STREAM``, + and be listening. + + Note that the :class:`UnixSocketListener` "takes ownership" of the given + socket; closing the :class:`UnixSocketListener` will also close the socket + and unlink its associated file. + + .. attribute:: socket + + The Trio socket object that this stream wraps. + + """ + + def __init__(self, socket: SocketType) -> None: + if not HAS_UNIX: + raise RuntimeError("Unix sockets are not supported on this platform") + if not isinstance(socket, tsocket.SocketType): + raise TypeError("SocketListener requires a Trio socket object") + if socket.type != tsocket.SOCK_STREAM: + raise ValueError("SocketListener requires a SOCK_STREAM socket") + try: + listening = socket.getsockopt(tsocket.SOL_SOCKET, tsocket.SO_ACCEPTCONN) + except OSError: + # SO_ACCEPTCONN fails on macOS; we just have to trust the user. + pass + else: + if not listening: + raise ValueError("SocketListener requires a listening socket") + + self.socket = socket + + async def accept(self) -> SocketStream: + """Accept an incoming connection. + + Returns: + :class:`SocketStream` + + Raises: + OSError: if the underlying call to ``accept`` raises an unexpected + error. + ClosedResourceError: if you already closed the socket. + + This method handles routine errors like ``ECONNABORTED``, but passes + other errors on to its caller. In particular, it does *not* make any + special effort to handle resource exhaustion errors like ``EMFILE``, + ``ENFILE``, ``ENOBUFS``, ``ENOMEM``. + + """ + while True: + try: + sock, _ = await self.socket.accept() + except OSError as exc: + if exc.errno in _closed_stream_errnos: + raise trio.ClosedResourceError from None + if exc.errno not in _ignorable_accept_errnos: + raise + else: + return SocketStream(sock) + + async def aclose(self) -> None: + """Close this listener, its underlying socket, and unlink its associated file.""" + path = self.socket.getsockname() + self.socket.close() + unlink(path) + await trio.lowlevel.checkpoint() From b916c3f8fbe225bfb8070ec4f80049f28a4d627c Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Tue, 14 Jan 2025 22:26:03 -0600 Subject: [PATCH 2/9] Add newsfragment --- newsfragments/3187.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/3187.feature.rst diff --git a/newsfragments/3187.feature.rst b/newsfragments/3187.feature.rst new file mode 100644 index 000000000..7c5820e11 --- /dev/null +++ b/newsfragments/3187.feature.rst @@ -0,0 +1 @@ +Add ``trio.open_unix_listener``, ``trio.serve_unix``, and ``trio.UnixSocketListener`` to support ``SOCK_STREAM`` `Unix domain sockets `__ From c922a52f7eba5c02c5e049cc94bfdb0a6e538436 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Tue, 14 Jan 2025 23:27:08 -0600 Subject: [PATCH 3/9] Add path functionality to `SocketListener` instead of making `UnixSocketListener` --- .../{3187.feature.rst => 279.feature.rst} | 0 src/trio/__init__.py | 1 - src/trio/_highlevel_open_unix_listeners.py | 79 ++++----------- src/trio/_highlevel_socket.py | 98 +++++-------------- 4 files changed, 43 insertions(+), 135 deletions(-) rename newsfragments/{3187.feature.rst => 279.feature.rst} (100%) diff --git a/newsfragments/3187.feature.rst b/newsfragments/279.feature.rst similarity index 100% rename from newsfragments/3187.feature.rst rename to newsfragments/279.feature.rst diff --git a/src/trio/__init__.py b/src/trio/__init__.py index 8f3d25350..fc6fd90ab 100644 --- a/src/trio/__init__.py +++ b/src/trio/__init__.py @@ -72,7 +72,6 @@ from ._highlevel_socket import ( SocketListener as SocketListener, SocketStream as SocketStream, - UnixSocketListener as UnixSocketListener, ) from ._highlevel_ssl_helpers import ( open_ssl_over_tcp_listeners as open_ssl_over_tcp_listeners, diff --git a/src/trio/_highlevel_open_unix_listeners.py b/src/trio/_highlevel_open_unix_listeners.py index 86fbddbdc..aad3b8301 100644 --- a/src/trio/_highlevel_open_unix_listeners.py +++ b/src/trio/_highlevel_open_unix_listeners.py @@ -1,13 +1,14 @@ from __future__ import annotations import os -import sys from typing import TYPE_CHECKING import trio import trio.socket as tsocket from trio import TaskStatus +from ._highlevel_open_tcp_listeners import _compute_backlog + if TYPE_CHECKING: from collections.abc import Awaitable, Callable @@ -20,50 +21,12 @@ HAS_UNIX = False -# Default backlog size: -# -# Having the backlog too low can cause practical problems (a perfectly healthy -# service that starts failing to accept connections if they arrive in a -# burst). -# -# Having it too high doesn't really cause any problems. Like any buffer, you -# want backlog queue to be zero usually, and it won't save you if you're -# getting connection attempts faster than you can call accept() on an ongoing -# basis. But unlike other buffers, this one doesn't really provide any -# backpressure. If a connection gets stuck waiting in the backlog queue, then -# from the peer's point of view the connection succeeded but then their -# send/recv will stall until we get to it, possibly for a long time. OTOH if -# there isn't room in the backlog queue, then their connect stalls, possibly -# for a long time, which is pretty much the same thing. -# -# A large backlog can also use a bit more kernel memory, but this seems fairly -# negligible these days. -# -# So this suggests we should make the backlog as large as possible. This also -# matches what Golang does. However, they do it in a weird way, where they -# have a bunch of code to sniff out the configured upper limit for backlog on -# different operating systems. But on every system, passing in a too-large -# backlog just causes it to be silently truncated to the configured maximum, -# so this is unnecessary -- we can just pass in "infinity" and get the maximum -# that way. (Verified on Windows, Linux, macOS using -# https://github.com/python-trio/trio/wiki/notes-to-self#measure-listen-backlogpy -def _compute_backlog(backlog: int | None) -> int: - # Many systems (Linux, BSDs, ...) store the backlog in a uint16 and are - # missing overflow protection, so we apply our own overflow protection. - # https://github.com/golang/go/issues/5030 - if not isinstance(backlog, int) and backlog is not None: - raise TypeError(f"backlog must be an int or None, not {backlog!r}") - if backlog is None: - return 0xFFFF - return min(backlog, 0xFFFF) - - async def open_unix_listener( path: str | bytes | os.PathLike[str] | os.PathLike[bytes], *, - mode: int | None = None, # 0o666, + mode: int | None = None, backlog: int | None = None, -) -> trio.UnixSocketListener: +) -> trio.SocketListener: """Create :class:`SocketListener` objects to listen for connections. Opens a connection to the specified `Unix domain socket `__. @@ -76,20 +39,21 @@ async def open_unix_listener( Absolute or relative paths may be used. mode (int or None): The socket file permissions. - UNIX permissions are usually specified in octal numbers. - If you leave this as ``None``, Trio will not change the mode from + UNIX permissions are usually specified in octal numbers. If + you leave this as ``None``, Trio will not change the mode from the operating system's default. backlog (int or None): The listen backlog to use. If you leave this as - ``None`` then Trio will pick a good default. (Currently: whatever - your system has configured as the maximum backlog.) + ``None`` then Trio will pick a good default. (Currently: + whatever your system has configured as the maximum backlog.) Returns: :class:`UnixSocketListener` Raises: - :class:`TypeError` if invalid arguments. + :class:`ValueError` If invalid arguments. :class:`RuntimeError`: If AF_UNIX sockets are not supported. + :class:`FileNotFoundError`: If folder socket file is to be created in does not exist. """ if not HAS_UNIX: raise RuntimeError("Unix sockets are not supported on this platform") @@ -102,28 +66,23 @@ async def open_unix_listener( if not await folder.exists(): raise FileNotFoundError(f"Socket folder does not exist: {folder!r}") - # much more simplified logic vs tcp sockets - one socket type and only one + str_path = str(fspath) + + # much more simplified logic vs tcp sockets - one socket family and only one # possible location to connect to sock = tsocket.socket(AF_UNIX, tsocket.SOCK_STREAM) try: - # See https://github.com/python-trio/trio/issues/39 - if sys.platform != "win32": - sock.setsockopt(tsocket.SOL_SOCKET, tsocket.SO_REUSEADDR, 1) - - await sock.bind(str(fspath)) - - sock.listen(computed_backlog) + await sock.bind(str_path) if mode is not None: await fspath.chmod(mode) - return trio.UnixSocketListener(sock) - except BaseException as exc: + sock.listen(computed_backlog) + + return trio.SocketListener(sock, str_path) + except BaseException: sock.close() - try: - os.unlink(str(fspath)) - except BaseException as exc_2: - raise exc_2 from exc + os.unlink(str_path) raise diff --git a/src/trio/_highlevel_socket.py b/src/trio/_highlevel_socket.py index 38b4cafd7..66398ab9d 100644 --- a/src/trio/_highlevel_socket.py +++ b/src/trio/_highlevel_socket.py @@ -3,7 +3,8 @@ import errno from contextlib import contextmanager, suppress -from os import unlink +from os import PathLike, stat, unlink +from stat import S_ISSOCK from typing import TYPE_CHECKING, Final, overload import trio @@ -71,6 +72,8 @@ class SocketStream(HalfCloseableStream): """ + __slots__ = ("_send_conflict_detector", "socket") + def __init__(self, socket: SocketType) -> None: if not isinstance(socket, tsocket.SocketType): raise TypeError("SocketStream requires a Trio socket object") @@ -355,9 +358,14 @@ class SocketListener(Listener[SocketStream]): incoming connections as :class:`SocketStream` objects. Args: + socket: The Trio socket object to wrap. Must have type ``SOCK_STREAM``, and be listening. + path: Used for keeping track of which path a Unix socket is bound + to. If not ``None``, :meth:`aclose` will unlink this path. + File must have socket mode flag set. + Note that the :class:`SocketListener` "takes ownership" of the given socket; closing the :class:`SocketListener` will also close the socket. @@ -365,80 +373,19 @@ class SocketListener(Listener[SocketStream]): The Trio socket object that this stream wraps. - """ - - def __init__(self, socket: SocketType) -> None: - if not isinstance(socket, tsocket.SocketType): - raise TypeError("SocketListener requires a Trio socket object") - if socket.type != tsocket.SOCK_STREAM: - raise ValueError("SocketListener requires a SOCK_STREAM socket") - try: - listening = socket.getsockopt(tsocket.SOL_SOCKET, tsocket.SO_ACCEPTCONN) - except OSError: - # SO_ACCEPTCONN fails on macOS; we just have to trust the user. - pass - else: - if not listening: - raise ValueError("SocketListener requires a listening socket") - - self.socket = socket - - async def accept(self) -> SocketStream: - """Accept an incoming connection. - - Returns: - :class:`SocketStream` - - Raises: - OSError: if the underlying call to ``accept`` raises an unexpected - error. - ClosedResourceError: if you already closed the socket. + .. attribute:: path - This method handles routine errors like ``ECONNABORTED``, but passes - other errors on to its caller. In particular, it does *not* make any - special effort to handle resource exhaustion errors like ``EMFILE``, - ``ENFILE``, ``ENOBUFS``, ``ENOMEM``. - - """ - while True: - try: - sock, _ = await self.socket.accept() - except OSError as exc: - if exc.errno in _closed_stream_errnos: - raise trio.ClosedResourceError from None - if exc.errno not in _ignorable_accept_errnos: - raise - else: - return SocketStream(sock) - - async def aclose(self) -> None: - """Close this listener and its underlying socket.""" - self.socket.close() - await trio.lowlevel.checkpoint() - - -@final -class UnixSocketListener(Listener[SocketStream]): - """A :class:`~trio.abc.Listener` that uses a listening socket to accept - incoming connections as :class:`SocketStream` objects. - - Args: - socket: The Trio socket object to wrap. Must have type ``SOCK_STREAM``, - and be listening. - - Note that the :class:`UnixSocketListener` "takes ownership" of the given - socket; closing the :class:`UnixSocketListener` will also close the socket - and unlink its associated file. - - .. attribute:: socket - - The Trio socket object that this stream wraps. + The path to unlink in :meth:`aclose` that a Unix socket is bound to. """ - def __init__(self, socket: SocketType) -> None: - if not HAS_UNIX: - raise RuntimeError("Unix sockets are not supported on this platform") + __slots__ = ("path", "socket") + + def __init__( + self, + socket: SocketType, + path: str | bytes | PathLike[str] | PathLike[bytes] | None = None, + ) -> None: if not isinstance(socket, tsocket.SocketType): raise TypeError("SocketListener requires a Trio socket object") if socket.type != tsocket.SOCK_STREAM: @@ -451,8 +398,11 @@ def __init__(self, socket: SocketType) -> None: else: if not listening: raise ValueError("SocketListener requires a listening socket") + if path is not None and not S_ISSOCK(stat(path).st_mode): + raise ValueError("Specified path must be a Unix socket file") self.socket = socket + self.path = path async def accept(self) -> SocketStream: """Accept an incoming connection. @@ -483,8 +433,8 @@ async def accept(self) -> SocketStream: return SocketStream(sock) async def aclose(self) -> None: - """Close this listener, its underlying socket, and unlink its associated file.""" - path = self.socket.getsockname() + """Close this listener and its underlying socket.""" self.socket.close() - unlink(path) + if self.path is not None: + unlink(self.path) await trio.lowlevel.checkpoint() From fe7ef60d1718f42037dca978d8a2a19b3e659151 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Wed, 15 Jan 2025 00:37:03 -0600 Subject: [PATCH 4/9] Add tests for `_highlevel_open_unix_listeners` --- .../test_highlevel_open_unix_listeners.py | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 src/trio/_tests/test_highlevel_open_unix_listeners.py diff --git a/src/trio/_tests/test_highlevel_open_unix_listeners.py b/src/trio/_tests/test_highlevel_open_unix_listeners.py new file mode 100644 index 000000000..793982726 --- /dev/null +++ b/src/trio/_tests/test_highlevel_open_unix_listeners.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import socket as stdlib_socket +import sys +from typing import TYPE_CHECKING, cast + +import pytest + +import trio +import trio.socket as tsocket +from trio import ( + SocketListener, + open_unix_listener, + serve_unix, +) +from trio.testing import open_stream_to_socket_listener + +if TYPE_CHECKING: + from pathlib import Path + + from trio.abc import SendStream + +assert not TYPE_CHECKING or sys.platform != "win32" + + +skip_if_not_unix = pytest.mark.skipif( + not hasattr(tsocket, "AF_UNIX"), + reason="Needs unix socket support", +) + + +@pytest.fixture +def temp_unix_socket_path(tmp_path: Path) -> str: + """Fixture to create a temporary Unix socket path.""" + # Create a temporary file in the tmp_path directory + temp_socket_path = tmp_path / "socket.sock" + return str(temp_socket_path) + + +@skip_if_not_unix +async def test_open_unix_listener_basic(temp_unix_socket_path: str) -> None: + listener = await open_unix_listener(temp_unix_socket_path) + + assert isinstance(listener, SocketListener) + # Check that the listener is using the Unix socket family + assert listener.socket.family == tsocket.AF_UNIX + assert listener.socket.getsockname() == temp_unix_socket_path + + # Make sure the backlog is at least 2 + c1 = await open_stream_to_socket_listener(listener) + c2 = await open_stream_to_socket_listener(listener) + + s1 = await listener.accept() + s2 = await listener.accept() + + # Note that we don't know which client stream is connected to which server + # stream + await s1.send_all(b"x") + await s2.send_all(b"x") + assert await c1.receive_some(1) == b"x" + assert await c2.receive_some(1) == b"x" + + for resource in [c1, c2, s1, s2, listener]: + await resource.aclose() + + +@skip_if_not_unix +async def test_open_unix_listener_specific_path(temp_unix_socket_path: str) -> None: + listener = await open_unix_listener(temp_unix_socket_path) + async with listener: + assert listener.socket.getsockname() == temp_unix_socket_path + + +@skip_if_not_unix +async def test_open_unix_listener_rebind(temp_unix_socket_path: str) -> None: + listener = await open_unix_listener(temp_unix_socket_path) + sockaddr1 = listener.socket.getsockname() + + # Attempt to bind again to the same socket should fail + with stdlib_socket.socket(tsocket.AF_UNIX) as probe: + with pytest.raises( + OSError, + match=r"(Address (already )?in use|An attempt was made to access a socket in a way forbidden by its access permissions)$", + ): + probe.bind(temp_unix_socket_path) + + # Now use the listener to set up some connections + c_established = await open_stream_to_socket_listener(listener) + s_established = await listener.accept() + await listener.aclose() + + # Attempt to bind again should succeed after closing the listener + listener2 = await open_unix_listener(temp_unix_socket_path) + sockaddr2 = listener2.socket.getsockname() + + assert sockaddr1 == sockaddr2 + assert s_established.socket.getsockname() == sockaddr2 + + for resource in [listener2, c_established, s_established]: + await resource.aclose() + + +@skip_if_not_unix +async def test_serve_unix(temp_unix_socket_path: str) -> None: + async def handler(stream: SendStream) -> None: + await stream.send_all(b"x") + + async with trio.open_nursery() as nursery: + # nursery.start is incorrectly typed, awaiting #2773 + value = await nursery.start(serve_unix, handler, temp_unix_socket_path) + assert isinstance(value, list) + listeners = cast("list[SocketListener]", value) + stream = await open_stream_to_socket_listener(listeners[0]) + async with stream: + assert await stream.receive_some(1) == b"x" + nursery.cancel_scope.cancel() + for listener in listeners: + await listener.aclose() + + +@pytest.mark.skipif(hasattr(tsocket, "AF_UNIX"), reason="Test for non-unix platforms") +async def test_error_on_no_unix(temp_unix_socket_path: str) -> None: + with pytest.raises( + RuntimeError, + match=r"^Unix sockets are not supported on this platform$", + ): + async with await open_unix_listener(temp_unix_socket_path): + pass From ec7645426773dd46a7395a859d44502823cbe070 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Wed, 15 Jan 2025 01:13:23 -0600 Subject: [PATCH 5/9] Revert `SocketListener` holding on to `path`, just ask unix socket. --- src/trio/_highlevel_open_unix_listeners.py | 4 +- src/trio/_highlevel_socket.py | 43 ++++++++++++---------- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/src/trio/_highlevel_open_unix_listeners.py b/src/trio/_highlevel_open_unix_listeners.py index aad3b8301..6f247226d 100644 --- a/src/trio/_highlevel_open_unix_listeners.py +++ b/src/trio/_highlevel_open_unix_listeners.py @@ -79,7 +79,7 @@ async def open_unix_listener( sock.listen(computed_backlog) - return trio.SocketListener(sock, str_path) + return trio.SocketListener(sock) except BaseException: sock.close() os.unlink(str_path) @@ -92,7 +92,7 @@ async def serve_unix( *, backlog: int | None = None, handler_nursery: trio.Nursery | None = None, - task_status: TaskStatus[list[trio.UnixSocketListener]] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[list[trio.SocketListener]] = trio.TASK_STATUS_IGNORED, ) -> None: """Listen for incoming UNIX connections, and for each one start a task running ``handler(stream)``. diff --git a/src/trio/_highlevel_socket.py b/src/trio/_highlevel_socket.py index 66398ab9d..810620cae 100644 --- a/src/trio/_highlevel_socket.py +++ b/src/trio/_highlevel_socket.py @@ -3,7 +3,8 @@ import errno from contextlib import contextmanager, suppress -from os import PathLike, stat, unlink +from os import stat, unlink +from os.path import exists from stat import S_ISSOCK from typing import TYPE_CHECKING, Final, overload @@ -18,7 +19,7 @@ from typing_extensions import Buffer - from ._socket import SocketType + from ._socket import AddressFormat, SocketType # XX TODO: this number was picked arbitrarily. We should do experiments to # tune it. (Or make it dynamic -- one idea is to start small and increase it @@ -358,33 +359,25 @@ class SocketListener(Listener[SocketStream]): incoming connections as :class:`SocketStream` objects. Args: - socket: The Trio socket object to wrap. Must have type ``SOCK_STREAM``, and be listening. - path: Used for keeping track of which path a Unix socket is bound - to. If not ``None``, :meth:`aclose` will unlink this path. - File must have socket mode flag set. - Note that the :class:`SocketListener` "takes ownership" of the given - socket; closing the :class:`SocketListener` will also close the socket. + socket; closing the :class:`SocketListener` will also close the + socket, and if it's a Unix socket, it will also unlink the leftover + socket file that the Unix socket is bound to. .. attribute:: socket The Trio socket object that this stream wraps. - .. attribute:: path - - The path to unlink in :meth:`aclose` that a Unix socket is bound to. - """ - __slots__ = ("path", "socket") + __slots__ = ("socket",) def __init__( self, socket: SocketType, - path: str | bytes | PathLike[str] | PathLike[bytes] | None = None, ) -> None: if not isinstance(socket, tsocket.SocketType): raise TypeError("SocketListener requires a Trio socket object") @@ -398,11 +391,8 @@ def __init__( else: if not listening: raise ValueError("SocketListener requires a listening socket") - if path is not None and not S_ISSOCK(stat(path).st_mode): - raise ValueError("Specified path must be a Unix socket file") self.socket = socket - self.path = path async def accept(self) -> SocketStream: """Accept an incoming connection. @@ -433,8 +423,21 @@ async def accept(self) -> SocketStream: return SocketStream(sock) async def aclose(self) -> None: - """Close this listener and its underlying socket.""" + """Close this listener, its underlying socket, and for Unix sockets unlink the socket file.""" + is_unix_socket = self.socket.family == getattr(tsocket, "AF_UNIX", None) + + path: AddressFormat | None = None + if is_unix_socket: + # If unix socket, need to get path before we close socket + # or OS errors + path = self.socket.getsockname() self.socket.close() - if self.path is not None: - unlink(self.path) + # If unix socket, clean up socket file that gets left behind. + if ( + is_unix_socket + and path is not None + and exists(path) + and S_ISSOCK(stat(path).st_mode) + ): + unlink(path) await trio.lowlevel.checkpoint() From 75cc5dfcb3735cc5c1419900cd01e1fc1cb3008d Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Wed, 15 Jan 2025 01:56:37 -0600 Subject: [PATCH 6/9] Fix pyright issue --- src/trio/_highlevel_socket.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/trio/_highlevel_socket.py b/src/trio/_highlevel_socket.py index 810620cae..f7a23364d 100644 --- a/src/trio/_highlevel_socket.py +++ b/src/trio/_highlevel_socket.py @@ -81,7 +81,7 @@ def __init__(self, socket: SocketType) -> None: if socket.type != tsocket.SOCK_STREAM: raise ValueError("SocketStream requires a SOCK_STREAM socket") - self.socket = socket + self.socket: SocketType = socket self._send_conflict_detector = ConflictDetector( "another task is currently sending data on this SocketStream", ) @@ -392,7 +392,7 @@ def __init__( if not listening: raise ValueError("SocketListener requires a listening socket") - self.socket = socket + self.socket: SocketType = socket async def accept(self) -> SocketStream: """Accept an incoming connection. From 705bd968f389154a1a5763babc2b8d0ffc8d5468 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Thu, 16 Jan 2025 18:34:26 -0600 Subject: [PATCH 7/9] Add comment to exports test, fix test issue, and only unlink file on failure if exists --- src/trio/_highlevel_open_unix_listeners.py | 3 ++- src/trio/_tests/test_exports.py | 8 ++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/trio/_highlevel_open_unix_listeners.py b/src/trio/_highlevel_open_unix_listeners.py index 6f247226d..6d2f6bb11 100644 --- a/src/trio/_highlevel_open_unix_listeners.py +++ b/src/trio/_highlevel_open_unix_listeners.py @@ -82,7 +82,8 @@ async def open_unix_listener( return trio.SocketListener(sock) except BaseException: sock.close() - os.unlink(str_path) + if os.path.exists(str_path): + os.unlink(str_path) raise diff --git a/src/trio/_tests/test_exports.py b/src/trio/_tests/test_exports.py index bad9ebec3..5cc7d3a35 100644 --- a/src/trio/_tests/test_exports.py +++ b/src/trio/_tests/test_exports.py @@ -452,8 +452,6 @@ def lookup_symbol(symbol: str) -> dict[str, str]: trio.Process: {"args", "pid", "stderr", "stdin", "stdio", "stdout"}, trio.SSLListener: {"transport_listener"}, trio.SSLStream: {"transport_stream"}, - trio.SocketListener: {"socket"}, - trio.SocketStream: {"socket"}, trio.testing.MemoryReceiveStream: {"close_hook", "receive_some_hook"}, trio.testing.MemorySendStream: { "close_hook", @@ -527,6 +525,12 @@ def lookup_symbol(symbol: str) -> dict[str, str]: print(f"\n{tool} can't see the following symbols in {module_name}:") pprint(errors) + print( + f""" +If there are extra attributes listed, try checking to make sure this test +isn't ignoring them. If there are missing attributes, try looking for why +{tool} isn't seeing them compared to `inspect.getmembers`.""" + ) assert not errors From e8232b485bda2b4cc84fcc9e368a6c5b04f699c5 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Thu, 16 Jan 2025 19:05:09 -0600 Subject: [PATCH 8/9] Help macos socket paths not be too long --- .../test_highlevel_open_unix_listeners.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/trio/_tests/test_highlevel_open_unix_listeners.py b/src/trio/_tests/test_highlevel_open_unix_listeners.py index 793982726..70daa53d7 100644 --- a/src/trio/_tests/test_highlevel_open_unix_listeners.py +++ b/src/trio/_tests/test_highlevel_open_unix_listeners.py @@ -2,6 +2,9 @@ import socket as stdlib_socket import sys +import tempfile +from os import unlink +from os.path import exists from typing import TYPE_CHECKING, cast import pytest @@ -16,6 +19,7 @@ from trio.testing import open_stream_to_socket_listener if TYPE_CHECKING: + from collections.abc import Generator from pathlib import Path from trio.abc import SendStream @@ -30,11 +34,17 @@ @pytest.fixture -def temp_unix_socket_path(tmp_path: Path) -> str: +def temp_unix_socket_path(tmp_path: Path) -> Generator[str, None, None]: """Fixture to create a temporary Unix socket path.""" - # Create a temporary file in the tmp_path directory - temp_socket_path = tmp_path / "socket.sock" - return str(temp_socket_path) + if sys.platform == "darwin": + # On macos, opening unix socket will fail if name is too long + temp_socket_path = tempfile.mkstemp(suffix=".sock") + else: + temp_socket_path = str(tmp_path / "socket.sock") + yield temp_socket_path + # If test failed to delete file at the end, do it for them. + if exists(temp_socket_path): + unlink(temp_socket_path) @skip_if_not_unix From 87e77d0b96c378c46036172e9676c7804e3d092f Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Thu, 16 Jan 2025 19:34:44 -0600 Subject: [PATCH 9/9] Fix macos mkstemp --- src/trio/_tests/test_highlevel_open_unix_listeners.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/trio/_tests/test_highlevel_open_unix_listeners.py b/src/trio/_tests/test_highlevel_open_unix_listeners.py index 70daa53d7..8c8721e45 100644 --- a/src/trio/_tests/test_highlevel_open_unix_listeners.py +++ b/src/trio/_tests/test_highlevel_open_unix_listeners.py @@ -38,7 +38,9 @@ def temp_unix_socket_path(tmp_path: Path) -> Generator[str, None, None]: """Fixture to create a temporary Unix socket path.""" if sys.platform == "darwin": # On macos, opening unix socket will fail if name is too long - temp_socket_path = tempfile.mkstemp(suffix=".sock") + temp_socket_path = tempfile.mkstemp(suffix=".sock")[1] + # mkstemp makes a file, we just wanted a unique name + unlink(temp_socket_path) else: temp_socket_path = str(tmp_path / "socket.sock") yield temp_socket_path