diff --git a/kyo-browser/shared/src/test/scala/kyo/BaseBrowserTest.scala b/kyo-browser/shared/src/test/scala/kyo/BaseBrowserTest.scala index 0f2b70e469..548601f507 100644 --- a/kyo-browser/shared/src/test/scala/kyo/BaseBrowserTest.scala +++ b/kyo-browser/shared/src/test/scala/kyo/BaseBrowserTest.scala @@ -13,7 +13,15 @@ abstract class BaseBrowserTest extends kyo.test.Test[Any]: // through Browser.assert* domain helpers and expected-exception fail-paths (Abort.run(...) { case Failure(_: X) // => () ; case _ => fail(...) }) that do not flow through the kyo.test assert macros, so the per-leaf // evaluation counter sees zero even though the leaf does verify behavior. The check is a false positive here. - override def config = super.config.sequential.failOnNoAssertion(false) + // + // The two opaque-inode descriptor categories are disabled (socket + non-socket fd), not the whole check. SharedChrome + // deliberately holds the headless Chrome process, its CDP connection socket, and the process's stdio pipes open for the + // WHOLE run (torn down at scheduler shutdown, see SharedChrome.ensureStarted). A CDP `socket:[inode]` and a stdio + // `pipe:[inode]` are opaque with no stable identifier an allowlist could match, so the socket and file-descriptor + // categories are the resources that cannot be expressed any finer. The other long-lived resource, the kyo-http + // NioIoDriver event-loop fiber, is already covered by the built-in allowlist, so fiber and thread detection stay on. + override def config = + super.config.sequential.failOnNoAssertion(false).leakCheckSockets(false).leakCheckFileDescriptors(false) // Pre-flight: check whether the current (OS, arch) tuple has a chrome-headless-shell artifact // (mac-arm64 / mac-x64 / linux64 / win64 / win32). Linux/Aarch64 and Windows/ARM have no published diff --git a/kyo-caliban/src/test/scala/kyo/BaseCalibanTest.scala b/kyo-caliban/src/test/scala/kyo/BaseCalibanTest.scala index 02999f1be7..b66d8de564 100644 --- a/kyo-caliban/src/test/scala/kyo/BaseCalibanTest.scala +++ b/kyo-caliban/src/test/scala/kyo/BaseCalibanTest.scala @@ -5,7 +5,13 @@ abstract class BaseCalibanTest extends kyo.test.Test[Any]: // ResolversTest's WebSocket subscription leaves are timing-sensitive: under kyo-test's concurrent leaf pool, // overlapping leaves starve WebSocket message delivery past the collectMessages deadline (these pass reliably on // main, where ScalaTest ran the suite's tests sequentially). Run each suite's leaves sequentially to restore that. - override def config = super.config.sequential + // + // Only the socket category is disabled. The GraphQL server in these suites is Scope-managed (closed on scope exit), + // but the NIO transport defers a listening socket's real fd close to its idle selector's next select(), which nothing + // wakes, so the listener fd outlives the run. That fix belongs to the transport (frozen for the kyo-net rewrite); the + // socket is an opaque socket:[inode] on an ephemeral port that no allowlist can match. File-descriptor, thread, and + // fiber detection stay on. + override def config = super.config.sequential.leakCheckSockets(false) // Run the body THROUGH the ZIO runtime (preserving the kyo<->ZIO/caliban interop these tests cover), // bridging the resulting Future back into a kyo computation so it can be a kyo-test leaf body. diff --git a/kyo-ffi/jvm/src/test/scala/kyo/ffi/internal/NativeLoaderConcurrencyTest.scala b/kyo-ffi/jvm/src/test/scala/kyo/ffi/internal/NativeLoaderConcurrencyTest.scala index 19c8d7b866..60b382ab7a 100644 --- a/kyo-ffi/jvm/src/test/scala/kyo/ffi/internal/NativeLoaderConcurrencyTest.scala +++ b/kyo-ffi/jvm/src/test/scala/kyo/ffi/internal/NativeLoaderConcurrencyTest.scala @@ -1,10 +1,9 @@ package kyo.ffi.internal import java.nio.channels.FileChannel -import java.nio.file.Files -import java.nio.file.Path -import java.nio.file.Paths import java.nio.file.StandardOpenOption +import kyo.* +import kyo.AllowUnsafe.embrace.danger import kyo.ffi.Test /** Resource-extraction concurrency. @@ -24,32 +23,37 @@ class NativeLoaderConcurrencyTest extends Test: override def config = super.config.sequential private def tempDir(): Path = - Files.createTempDirectory("kyo-ffi-f11-").nn + Sync.Unsafe.evalOrThrow(Path.tempDir("kyo-ffi-f11-")) + + // NativeLoader's API, FileChannel, and the extracted-file registry are java.nio.file.Path infra (FFM, cross-process + // advisory locks, atomic-move fallback); bridge the test's kyo.Path values to java only at those call sites. + private def j(p: Path): java.nio.file.Path = + java.nio.file.Path.of(p.toString) "tryCleanupStaleLock" - { "removes an abandoned .lck file with no live lock holder" in { val dir = tempDir() - val lck = dir.resolve("libabc-deadbeef.lck").nn - Files.createFile(lck): Unit - assert(Files.exists(lck) == true) + val lck = dir / "libabc-deadbeef.lck" + lck.unsafe.mkFile().getOrThrow + assert(lck.unsafe.exists() == true) - val removed = NativeLoader.tryCleanupStaleLock(lck) + val removed = NativeLoader.tryCleanupStaleLock(j(lck)) assert(removed == true) - assert(Files.exists(lck) == false) + assert(lck.unsafe.exists() == false) } "leaves a live-locked .lck file in place" in { val dir = tempDir() - val lck = dir.resolve("libxyz-cafef00d.lck").nn - val ch = FileChannel.open(lck, StandardOpenOption.CREATE, StandardOpenOption.WRITE).nn + val lck = dir / "libxyz-cafef00d.lck" + val ch = FileChannel.open(j(lck), StandardOpenOption.CREATE, StandardOpenOption.WRITE).nn try val lk = ch.lock().nn try - val removed = NativeLoader.tryCleanupStaleLock(lck) + val removed = NativeLoader.tryCleanupStaleLock(j(lck)) // Another in-JVM lock holder → tryLock throws OverlappingFileLockException, caller must NOT delete. assert(removed == false) - assert(Files.exists(lck) == true) + assert(lck.unsafe.exists() == true) finally lk.release() end try finally ch.close() @@ -58,14 +62,14 @@ class NativeLoaderConcurrencyTest extends Test: "returns false for a missing lock file" in { val dir = tempDir() - val lck = dir.resolve("does-not-exist.lck").nn - assert(NativeLoader.tryCleanupStaleLock(lck) == false) + val lck = dir / "does-not-exist.lck" + assert(NativeLoader.tryCleanupStaleLock(j(lck)) == false) } } "resolveExtractDir" - { "honours -Dkyo.ffi.extractDir= verbatim" in { - val explicit = tempDir().resolve("f11-explicit").nn + val explicit = tempDir() / "f11-explicit" val prop = "kyo.ffi.extractDir" val prior = Option(java.lang.System.getProperty(prop)) java.lang.System.setProperty(prop, explicit.toString): Unit @@ -96,59 +100,57 @@ class NativeLoaderConcurrencyTest extends Test: "writeAtomicRename" - { "atomically installs the full payload and leaves no .tmp- residue on success" in { val dir = tempDir() - val out = dir.resolve("libpayload-cafebabe.so").nn - val data = "kyo-ffi F11 atomic payload".getBytes + val out = dir / "libpayload-cafebabe.so" + val data = "kyo-ffi F11 atomic payload" - NativeLoader.writeAtomicRename(dir, out, data) + NativeLoader.writeAtomicRename(j(dir), j(out), data.getBytes) - assert(Files.exists(out) == true) - assert(Files.readAllBytes(out).nn.toSeq == data.toSeq) + assert(out.unsafe.exists() == true) + assert(out.unsafe.read().getOrThrow == data) // No `.tmp-` sibling should remain, the atomic rename consumed the temp file. - val entries = Files.list(dir).nn.iterator().nn - while entries.hasNext do - val name = entries.next().nn.getFileName.nn.toString - assert(!name.contains(".tmp-")) - end while + // Path.Unsafe.list closes the dir stream as it collects (no leaked fd). + val entries = dir.unsafe.list().getOrThrow + entries.foreach(entry => assert(!entry.name.getOrElse("").contains(".tmp-"))) } } "cleanupExtractedFiles" - { "removes files newer than install epoch" in { val dir = tempDir() - val fresh = dir.resolve("libfresh-00112233.so").nn - Files.write(fresh, "fresh bytes".getBytes): Unit + val fresh = dir / "libfresh-00112233.so" + fresh.unsafe.write("fresh bytes").getOrThrow val reg = classOf[NativeLoader.type].nn.getDeclaredField("extractedThisJvm").nn reg.setAccessible(true) - val set = reg.get(NativeLoader).asInstanceOf[java.util.Set[Path]] - set.add(fresh): Unit + val set = reg.get(NativeLoader).asInstanceOf[java.util.Set[java.nio.file.Path]] + set.add(j(fresh)): Unit try // Install epoch is BEFORE file creation, so fresh file's mtime ≥ install → deleted. NativeLoader.cleanupExtractedFiles(0L) - assert(Files.exists(fresh) == false) - finally set.remove(fresh): Unit + assert(fresh.unsafe.exists() == false) + finally set.remove(j(fresh)): Unit end try } "leaves files older than install epoch alone" in { val dir = tempDir() - val old_ = dir.resolve("libold-44556677.so").nn - Files.write(old_, "old bytes".getBytes): Unit + val old_ = dir / "libold-44556677.so" + old_.unsafe.write("old bytes").getOrThrow val reg = classOf[NativeLoader.type].nn.getDeclaredField("extractedThisJvm").nn reg.setAccessible(true) - val set = reg.get(NativeLoader).asInstanceOf[java.util.Set[Path]] - set.add(old_): Unit + val set = reg.get(NativeLoader).asInstanceOf[java.util.Set[java.nio.file.Path]] + set.add(j(old_)): Unit try // Install epoch is FAR in the future, every file is older → none deleted. val farFuture = java.lang.System.currentTimeMillis() + (1000L * 60 * 60 * 24 * 365) NativeLoader.cleanupExtractedFiles(farFuture) - assert(Files.exists(old_) == true) + assert(old_.unsafe.exists() == true) finally - set.remove(old_): Unit - Files.deleteIfExists(old_): Unit + set.remove(j(old_)): Unit + discard(old_.unsafe.remove()) end try } } diff --git a/kyo-ffi/jvm/src/test/scala/kyo/ffi/internal/NativeLoaderForkStressTest.scala b/kyo-ffi/jvm/src/test/scala/kyo/ffi/internal/NativeLoaderForkStressTest.scala index 6ee0f64905..bcc6d8e227 100644 --- a/kyo-ffi/jvm/src/test/scala/kyo/ffi/internal/NativeLoaderForkStressTest.scala +++ b/kyo-ffi/jvm/src/test/scala/kyo/ffi/internal/NativeLoaderForkStressTest.scala @@ -1,11 +1,10 @@ package kyo.ffi.internal -import java.nio.file.Files -import java.nio.file.Path -import java.nio.file.Paths import java.security.MessageDigest import java.util.concurrent.Executors import java.util.concurrent.TimeUnit +import kyo.* +import kyo.AllowUnsafe.embrace.danger import kyo.ffi.Test import scala.concurrent.Await import scala.concurrent.ExecutionContext @@ -26,12 +25,12 @@ class NativeLoaderForkStressTest extends Test: // Resolve the JVM test classpath and a java binary so we can ProcessBuilder a child JVM that loads NativeLoaderForkMain. private val javaHome: Path = - Paths.get(java.lang.System.getProperty("java.home").nn).nn + Path(java.lang.System.getProperty("java.home").nn) private val javaBin: Path = - val candidate = javaHome.resolve("bin").nn.resolve("java").nn - if Files.exists(candidate) then candidate - else javaHome.resolve("bin").nn.resolve("java.exe").nn // Windows fallback; test will also cover Windows hosts. + val candidate = javaHome / "bin" / "java" + if candidate.unsafe.exists() then candidate + else javaHome / "bin" / "java.exe" // Windows fallback; test will also cover Windows hosts. end javaBin // The test runs in-VM; java.class.path contains the test runtime classpath (prod classes + test classes + scalatest + deps). @@ -50,7 +49,7 @@ class NativeLoaderForkStressTest extends Test: "fork N JVMs extract the same payload concurrently" in { val forkN = sys.props.getOrElse("kyo.ffi.testForkN", "4").toInt val payload = ("F11-fork-stress-payload-" + java.util.UUID.randomUUID()).getBytes() - val dir = Files.createTempDirectory("kyo-ffi-fork-").nn + val dir = Sync.Unsafe.evalOrThrow(Path.tempDir("kyo-ffi-fork-")) val libId = s"forkstress_${java.lang.System.currentTimeMillis()}" val hex = hexEncode(payload) @@ -87,19 +86,11 @@ class NativeLoaderForkStressTest extends Test: assert(reportedPaths.size == 1) // The final extracted file exists, matches the payload byte-for-byte (atomic rename guarantee: no partial writes). - val finalPath = Paths.get(reportedPaths.head).nn - assert(Files.exists(finalPath) == true) - assert(Files.readAllBytes(finalPath).nn.toSeq == payload.toSeq) - // No `.tmp-` residue from any child, atomic rename cleaned up every interim write. - val tmpLeftovers = Files.list(dir).nn.iterator().nn - val residue = - val buf = scala.collection.mutable.Buffer.empty[String] - while tmpLeftovers.hasNext do - val n = tmpLeftovers.next().nn.getFileName.nn.toString - if n.contains(".tmp-") then buf += n - end while - buf.toList - end residue + val finalPath = Path(reportedPaths.head) + assert(finalPath.unsafe.exists() == true) + assert(finalPath.unsafe.read().getOrThrow == new String(payload)) + // No `.tmp-` residue from any child; Path.Unsafe.list closes the dir stream as it collects (no leaked fd). + val residue = dir.unsafe.list().getOrThrow.map(_.name.getOrElse("")).filter(_.contains(".tmp-")).toList assert(residue == Nil) finally pool.shutdownNow(): Unit diff --git a/kyo-http/jvm/src/test/scala/kyo/internal/NioTransportTest.scala b/kyo-http/jvm/src/test/scala/kyo/internal/NioTransportTest.scala index 5f1b807111..3ec56e0ecb 100644 --- a/kyo-http/jvm/src/test/scala/kyo/internal/NioTransportTest.scala +++ b/kyo-http/jvm/src/test/scala/kyo/internal/NioTransportTest.scala @@ -22,7 +22,6 @@ class NioTransportTest extends kyo.BaseHttpTest: // ----------------------------------------------------------------------- "init stores IoDriverPool" in { - given Frame = Frame.internal val transport = mkTransport() try assert(transport.pool ne null) @@ -32,7 +31,6 @@ class NioTransportTest extends kyo.BaseHttpTest: } "init stores IoDriver in pool" in { - given Frame = Frame.internal val transport = mkTransport() try val driver = transport.pool.next() @@ -47,7 +45,6 @@ class NioTransportTest extends kyo.BaseHttpTest: // ----------------------------------------------------------------------- "connect to loopback server returns open connection" in { - given Frame = Frame.internal val transport = mkTransport() // Use a latch so the server-side socket stays open until after we check isOpen @@ -82,7 +79,6 @@ class NioTransportTest extends kyo.BaseHttpTest: } "connect returns Closed failure for unreachable port" in { - given Frame = Frame.internal val transport = mkTransport() // Port 1 — connection refused on loopback Abort.run[Closed](transport.connect("127.0.0.1", 1).safe.get).map { result => @@ -97,7 +93,6 @@ class NioTransportTest extends kyo.BaseHttpTest: // ----------------------------------------------------------------------- "listen binds and returns Listener with valid port" in { - given Frame = Frame.internal val transport = mkTransport() val listenFiber = transport.listen("127.0.0.1", 0, 50)(_ => ()) listenFiber.safe.get.map { listener => @@ -111,7 +106,6 @@ class NioTransportTest extends kyo.BaseHttpTest: } "listen binds to the given host" in { - given Frame = Frame.internal val transport = mkTransport() val listenFiber = transport.listen("127.0.0.1", 0, 50)(_ => ()) listenFiber.safe.get.map { listener => @@ -124,7 +118,6 @@ class NioTransportTest extends kyo.BaseHttpTest: } "listen returns Closed failure for already-bound port" in { - given Frame = Frame.internal val transport = mkTransport() transport.listen("127.0.0.1", 0, 50)(_ => ()).safe.get.map { listener1 => @@ -142,7 +135,6 @@ class NioTransportTest extends kyo.BaseHttpTest: // ----------------------------------------------------------------------- "listen accepts connecting clients" in { - given Frame = Frame.internal val transport = mkTransport() val accepted = new java.util.concurrent.atomic.AtomicBoolean(false) @@ -173,7 +165,6 @@ class NioTransportTest extends kyo.BaseHttpTest: // ----------------------------------------------------------------------- "connect + listen — both sides are open connections" in { - given Frame = Frame.internal val transport = mkTransport() // Use a latch to hold the server-side connection open until client checks isOpen @@ -207,7 +198,6 @@ class NioTransportTest extends kyo.BaseHttpTest: // ----------------------------------------------------------------------- "connect with TLS to loopback TLS server completes handshake" in { - given Frame = Frame.internal val transport = mkTransport() val tlsConfig = TlsTestHelper.serverTlsConfig @@ -238,7 +228,6 @@ class NioTransportTest extends kyo.BaseHttpTest: // ----------------------------------------------------------------------- "listener.close marks listener as closed" in { - given Frame = Frame.internal val transport = mkTransport() transport.listen("127.0.0.1", 0, 50)(_ => ()).safe.get.map { listener => @@ -281,7 +270,6 @@ class NioTransportTest extends kyo.BaseHttpTest: // ----------------------------------------------------------------------- "connectUnix returns Closed for non-existent socket path" in { - given Frame = Frame.internal val transport = mkTransport() val badPath = "/tmp/kyo-nio-test-does-not-exist-" + java.util.UUID.randomUUID() + ".sock" Abort.run[Closed](transport.connectUnix(badPath).safe.get).map { result => @@ -290,4 +278,42 @@ class NioTransportTest extends kyo.BaseHttpTest: } } + // ----------------------------------------------------------------------- + // Accepted-connection cleanup on server close (socket leak regression) + // ----------------------------------------------------------------------- + + // Guards the accepted-connection cleanup that the kyo-caliban socket leak needs: each iteration scopes an ephemeral + // server and makes one request through the process-global default HttpClient (whose pooled connection stays alive, + // not closed per leaf, exactly like caliban's static HttpClient.getText/postText usage). A server that closed only + // its listening socket on shutdown would leave every accepted connection open, so the accepted socket plus its + // still-pooled client peer would accumulate across iterations (closed only by a 60s idle timer). Closing the + // accepted connections on server close sends a FIN that also clears the client's stale pooled connection, so the + // open-fd count stays stable across iterations. JVM-only: the open-descriptor count comes from the Unix OS MXBean. + "closing a server releases its accepted connections (no socket accumulation)" in { + java.lang.management.ManagementFactory.getOperatingSystemMXBean match + case osBean: com.sun.management.UnixOperatingSystemMXBean => + val route = HttpRoute.getRaw("hello").response(_.bodyText) + val handler = route.handler(_ => HttpResponse.ok("world")) + def cycle: Unit < (Async & Abort[Any]) = + Scope.run { + HttpServer.init(0, "localhost")(handler).map { server => + HttpClient.getText(s"http://localhost:${server.port}/hello").unit + } + } + for + _ <- cycle // warm up lazy infra (default client, transport, classloading) + before <- Sync.defer { java.lang.System.gc(); osBean.getOpenFileDescriptorCount } + _ <- Kyo.foreachDiscard(1 to 20)(_ => cycle) + _ <- Async.sleep(500.millis) // let any async connection closes settle + after <- Sync.defer { java.lang.System.gc(); osBean.getOpenFileDescriptorCount } + yield assert( + after - before <= 8, + s"sockets accumulated across 20 server+client cycles: before=$before after=$after delta=${after - before}" + ) + end for + case _ => + cancel("UnixOperatingSystemMXBean unavailable on this JVM") + end match + } + end NioTransportTest diff --git a/kyo-http/shared/src/main/scala/kyo/HttpServer.scala b/kyo-http/shared/src/main/scala/kyo/HttpServer.scala index 7ca4c80580..72c2ed4435 100644 --- a/kyo-http/shared/src/main/scala/kyo/HttpServer.scala +++ b/kyo-http/shared/src/main/scala/kyo/HttpServer.scala @@ -5,6 +5,7 @@ import kyo.internal.HttpPlatformTransport import kyo.internal.codec.OpenApiGenerator import kyo.internal.server.HttpRouter import kyo.internal.server.UnsafeServerDispatch +import kyo.internal.transport.Connection import kyo.internal.transport.Transport /** HTTP server that binds one or more handlers to a port and manages the server lifecycle. @@ -187,30 +188,40 @@ object HttpServer: handlers: Seq[HttpHandler[?, ?, ?]] )(using AllowUnsafe, Frame): Fiber.Unsafe[Unsafe, Abort[Closed]] = val router = HttpRouter(handlers, config.cors) + // Track every accepted connection so the server can close them on shutdown: the transport listener owns only + // the listening socket, so an accepted keep-alive connection would otherwise stay open until a 60s idle timer + // fires (it leaks whenever the peer keeps its side pooled rather than sending an EOF). The shared registry is + // the same mechanism HttpClientBackend uses for the connections it creates. + val registry = new kyo.internal.ConnectionRegistry[Connection[H]] + def tracked(conn: Connection[H]): Unit < Async = + // Prune closed entries on accept (no per-connection close hook), then register this one. register closes + // the connection itself and returns false when a shutdown races this accept, so the connection is + // neither served nor left open, and a failing close is contained inside the registry rather than + // surfacing on the accept path. + registry.pruneClosed(_.isOpen) + if registry.register(conn)(_.close()) then + UnsafeServerDispatch.serve(router, conn.inbound, conn.outbound, config) + else + Kyo.unit + end if + end tracked val listenFiber = (config.unixSocket, config.tls) match case (Present(path), _) => - transport.listenUnix(path, config.backlog) { conn => - UnsafeServerDispatch.serve(router, conn.inbound, conn.outbound, config) - } + transport.listenUnix(path, config.backlog)(tracked) case (Absent, Present(tls)) => - transport.listen(config.host, config.port, config.backlog, tls) { conn => - UnsafeServerDispatch.serve(router, conn.inbound, conn.outbound, config) - } + transport.listen(config.host, config.port, config.backlog, tls)(tracked) case _ => - transport.listen(config.host, config.port, config.backlog) { conn => - UnsafeServerDispatch.serve(router, conn.inbound, conn.outbound, config) - } - listenFiber.map { listener => - new ListenerUnsafe(listener) - } + transport.listen(config.host, config.port, config.backlog)(tracked) + listenFiber.map(listener => new ListenerUnsafe(listener, registry)) end init end Unsafe // --- Private implementations --- - /** Unsafe implementation wrapping a Listener from Transport. */ - final private class ListenerUnsafe( - listener: kyo.internal.transport.Listener + /** Unsafe implementation wrapping a Listener from Transport, plus the registry of accepted connections it owns. */ + final private class ListenerUnsafe[H]( + listener: kyo.internal.transport.Listener, + registry: kyo.internal.ConnectionRegistry[Connection[H]] )(using allow: AllowUnsafe) extends Unsafe: private val closedPromise = Promise.Unsafe.init[Unit, Any]() @@ -225,8 +236,23 @@ object HttpServer: def address: HttpAddress = listener.address def closeFiber(gracePeriod: Duration)(using AllowUnsafe, Frame): Fiber.Unsafe[Unit, Any] = - listener.close() - discard(closedPromise.completeDiscard(Result.succeed(()))) + listener.close() // stop accepting new connections first + registry.markClosing() // any accept racing the close now closes itself in `tracked` instead of being orphaned + + def forceCloseAndComplete(): Unit = + registry.closeAll(_.close()) + discard(closedPromise.completeDiscard(Result.succeed(()))) + end forceCloseAndComplete + + if gracePeriod <= Duration.Zero then + // closeNow: force-close every accepted connection at once, the path the Scope finalizer (`_.closeNow`) + // takes. Accepted keep-alive connections otherwise outlive the listening socket, held open until the + // 60s idle timer fires. + forceCloseAndComplete() + else + // Graceful: let in-flight connections run for the grace period, then force-close whatever remains. + discard(Clock.live.unsafe.sleep(gracePeriod).onComplete(_ => forceCloseAndComplete())) + end if closedPromise end closeFiber diff --git a/kyo-http/shared/src/main/scala/kyo/internal/ConnectionRegistry.scala b/kyo-http/shared/src/main/scala/kyo/internal/ConnectionRegistry.scala new file mode 100644 index 0000000000..272899f3cc --- /dev/null +++ b/kyo-http/shared/src/main/scala/kyo/internal/ConnectionRegistry.scala @@ -0,0 +1,61 @@ +package kyo.internal + +import java.util.concurrent.ConcurrentHashMap +import kyo.* + +/** Tracks every live connection a client or server holds so all of them can be closed on shutdown. + * + * The transport's I/O driver is process-global and is never closed, so a connection's close happens here at the kyo-http + * layer rather than through the driver. `HttpClientBackend` records the connections it creates and `HttpServer` records + * the connections it accepts; both register them here and close the lot when their own `close` runs. + * + * Thread-safety: a connection is closed exactly once even when a registration races `closeAll`. Both the registering + * caller and `closeAll` claim a connection by `remove`-ing it from the set, and only the caller that wins the remove + * closes it. `register` also closes the connection itself when it observes shutdown, so a caller that is interrupted + * right after registering never leaks its connection, and `closeAll` never drops a still-open connection. + */ +final private[kyo] class ConnectionRegistry[C]: + + private val conns = ConcurrentHashMap.newKeySet[C]().nn + @volatile private var closingFlag = false + + /** Register a live connection. Returns true if the caller may keep using it, or false if a shutdown is in progress, + * in which case the connection has already been closed (through `close`) and the caller must not use it. + */ + def register(conn: C)(close: C => Unit): Boolean = + discard(conns.add(conn)) + if !closingFlag then true + else + // Shutdown raced this registration. Claim the connection back: if we remove it, closeAll has not closed it, + // so close it here; if we cannot, closeAll already claimed and closed it. Either way it is closed exactly once. + if conns.remove(conn) then runClose(conn, close) + false + end if + end register + + /** Drop a connection that is being closed through another path (for example evicted from a pool). */ + def remove(conn: C): Unit = discard(conns.remove(conn)) + + /** Drop entries that are no longer open, bounding the set without needing a per-connection close hook. */ + def pruneClosed(isOpen: C => Boolean): Unit = discard(conns.removeIf(c => !isOpen(c))) + + /** Begin shutdown without closing anything yet, so a connection registered after this closes itself in `register` + * instead of being used. Used when there is work to do (a grace period, a pool drain) before closing connections. + */ + def markClosing(): Unit = closingFlag = true + + /** Mark closing and close every registered connection, claiming each through `remove` so a connection a concurrent + * `register` is adding is closed exactly once (whichever of the two removes it wins). + */ + def closeAll(close: C => Unit): Unit = + closingFlag = true + conns.forEach { c => + if conns.remove(c) then runClose(c, close) + } + end closeAll + + private def runClose(conn: C, close: C => Unit): Unit = + try close(conn) + catch case _: Throwable => () + +end ConnectionRegistry diff --git a/kyo-http/shared/src/main/scala/kyo/internal/client/HttpClientBackend.scala b/kyo-http/shared/src/main/scala/kyo/internal/client/HttpClientBackend.scala index 0f9940f922..7ca8510d45 100644 --- a/kyo-http/shared/src/main/scala/kyo/internal/client/HttpClientBackend.scala +++ b/kyo-http/shared/src/main/scala/kyo/internal/client/HttpClientBackend.scala @@ -1,7 +1,6 @@ package kyo.internal.client import java.io.IOException -import java.util.concurrent.ConcurrentHashMap import kyo.* import kyo.internal.codec.* import kyo.internal.http1.* @@ -30,7 +29,7 @@ final private[kyo] class HttpClientBackend[Handle] private ( transportConfig: HttpTransportConfig, defaultTlsConfig: HttpTlsConfig, private val pool: ConnectionPool[HttpConnection[Handle]], - private val allConnections: ConcurrentHashMap[HttpConnection[Handle], Unit], + private val registry: kyo.internal.ConnectionRegistry[HttpConnection[Handle]], val maxConnectionsPerHost: Int, val clientFrame: Frame ): @@ -845,14 +844,14 @@ final private[kyo] class HttpClientBackend[Handle] private ( // -- Pool management and orchestration layer -- - @volatile private var clientClosed = false @volatile private var closingGracePeriod: Duration = Duration.Zero - /** Track a newly created connection. If the client has already been closed, close it immediately. */ + /** Track a newly created connection. If the client is already closing, register closes it immediately and the + * in-flight request then fails on the closed connection, the same outcome as an explicit close-after-add but + * without the race where a concurrent closeAll could drop a connection it had not closed. + */ private def trackConn(conn: HttpConnection[Handle])(using AllowUnsafe, Frame): Unit = - discard(allConnections.put(conn, ())) - if clientClosed then - closeUnsafe(conn, closingGracePeriod) + discard(registry.register(conn)(c => closeUnsafe(c, closingGracePeriod))) end trackConn /** Release a connection back to the pool or discard it on error. Does NOT touch allConnections — removal happens only in discardConn @@ -1051,18 +1050,15 @@ final private[kyo] class HttpClientBackend[Handle] private ( sendBuffered(conn, route, request) def closeFiber(gracePeriod: Duration)(using AllowUnsafe, Frame): Fiber.Unsafe[Unit, Any] = - // Mark closed FIRST so any new connection gets closed immediately by trackConn. + // Mark closing FIRST so any new connection gets closed immediately by trackConn. closingGracePeriod = gracePeriod - clientClosed = true - // Close pool to stop reuse, then close all tracked connections. - // allConnections has every connection from creation to close (discardConn removes them). - // Pool idle connections are a subset — closing from allConnections covers everything. + registry.markClosing() + // Close the pool to stop reuse, then close every tracked connection. The registry holds every connection from + // creation to close (the pool's discard removes them), and idle pooled connections are a subset, so closing from + // the registry covers everything. discard(pool.close()) val closePromise = Promise.Unsafe.init[Unit, Any]() - allConnections.forEach { (conn, _) => - closeUnsafe(conn, gracePeriod) - } - allConnections.clear() + registry.closeAll(conn => closeUnsafe(conn, gracePeriod)) closePromise.completeDiscard(Result.succeed(())) closePromise end closeFiber @@ -1078,13 +1074,13 @@ private[kyo] object HttpClientBackend: idleConnectionTimeout: Duration, defaultTlsConfig: HttpTlsConfig = HttpTlsConfig.default )(using AllowUnsafe, Frame): HttpClientBackend[H] = - val conns = new ConcurrentHashMap[HttpConnection[H], Unit]() + val registry = new kyo.internal.ConnectionRegistry[HttpConnection[H]] val pool = ConnectionPool.init[HttpConnection[H]]( maxConnsPerHost, idleConnectionTimeout, conn => conn.transport.isOpen, conn => - discard(conns.remove(conn)) + registry.remove(conn) conn.http1.close() conn.transport.close() ) @@ -1093,7 +1089,7 @@ private[kyo] object HttpClientBackend: HttpTransportConfig.default, defaultTlsConfig, pool, - conns, + registry, maxConnsPerHost, summon[Frame] ) diff --git a/kyo-http/shared/src/test/scala/kyo/BaseHttpTest.scala b/kyo-http/shared/src/test/scala/kyo/BaseHttpTest.scala index f33a936084..71392523af 100644 --- a/kyo-http/shared/src/test/scala/kyo/BaseHttpTest.scala +++ b/kyo-http/shared/src/test/scala/kyo/BaseHttpTest.scala @@ -2,6 +2,13 @@ package kyo abstract class BaseHttpTest extends kyo.test.Test[Any]: + // Only the socket category is disabled. These suites start and Scope-close HTTP servers, but the NIO transport + // defers a listening socket's real fd close to its idle selector's next select(), which nothing wakes, so the + // listener fd outlives the run. That fix belongs to the transport (frozen for the kyo-net rewrite); the socket is an + // opaque socket:[inode] on an ephemeral port that no allowlist can match. File-descriptor, thread, and fiber + // detection stay on. + override def config = super.config.leakCheckSockets(false) + // Linux Native CI HTTP server bring-up + per-request latency can exceed the production 5-second HttpClient // default, so every test request would fail with HttpTimeoutException. Wrap every leaf so test requests get a // 60s client request timeout (production users still see the 5s default until they set their own via withConfig). diff --git a/kyo-jsonrpc-http/src/test/scala/kyo/JsonRpcHttpTransportTest.scala b/kyo-jsonrpc-http/src/test/scala/kyo/JsonRpcHttpTransportTest.scala index a56a98f7d4..bda9f9b11c 100644 --- a/kyo-jsonrpc-http/src/test/scala/kyo/JsonRpcHttpTransportTest.scala +++ b/kyo-jsonrpc-http/src/test/scala/kyo/JsonRpcHttpTransportTest.scala @@ -2,6 +2,13 @@ package kyo class JsonRpcHttpTransportTest extends kyo.test.Test[Any]: + // Only the socket category is disabled. These tests run an HttpServer and HttpClient over the NIO transport, which + // defers a closed channel's real fd close to its idle selector's next select(), which nothing wakes, so the + // listener/connection fd outlives the run. That fix belongs to the transport (frozen for the kyo-net rewrite); the + // socket is an opaque socket:[inode] no allowlist can match. File-descriptor, thread, and fiber detection stay on. + // Same opt-out and reason as BaseHttpTest. + override def config = super.config.leakCheckSockets(false) + // Linux Native CI HTTP server bring-up + per-request latency can exceed the production 5-second HttpClient // default. Wrap every leaf so test requests get a 60s client request timeout (mirrors BaseHttpTest). override def aroundLeaf[A](body: A < (Async & Abort[Any] & Scope))(using Frame): A < (Async & Abort[Any] & Scope) = diff --git a/kyo-pod/shared/src/test/scala/kyo/BasePodTest.scala b/kyo-pod/shared/src/test/scala/kyo/BasePodTest.scala index 593ea17c4f..3b78371ec0 100644 --- a/kyo-pod/shared/src/test/scala/kyo/BasePodTest.scala +++ b/kyo-pod/shared/src/test/scala/kyo/BasePodTest.scala @@ -34,7 +34,13 @@ abstract class BasePodTest extends kyo.test.Test[Any]: // assumes "<=1 in-flight container op per daemon", which only holds with sequential leaves. kyo-test defaults to // parallel leaves whereas the ScalaTest base ran them sequentially, so restore that. Without it, parallel leaves // race the daemon and produce port conflicts, already-exists, image-pull, and backend errors. - override def config = super.config.sequential + // + // Only the socket category is disabled. These suites reach the podman/docker daemon REST API over HttpClient, and + // those connections are Scope-closed on scope exit, but the NIO transport defers a connection's real fd close to its + // idle selector's next select(), which nothing wakes, so the fd outlives the run. That fix belongs to the transport + // (frozen for the kyo-net rewrite); the socket is an opaque socket:[inode] no allowlist can match. File-descriptor, + // thread, and fiber detection stay on. + override def config = super.config.sequential.leakCheckSockets(false) // Linux CI's container runtime (podman REST API) intermittently takes longer than // the production 5-second `HttpClientConfig.timeout` default for ordinary Container ops diff --git a/kyo-reactive-streams/shared/src/main/scala/kyo/interop/flow/StreamSubscription.scala b/kyo-reactive-streams/shared/src/main/scala/kyo/interop/flow/StreamSubscription.scala index 0455582e99..90e67c3ffe 100644 --- a/kyo-reactive-streams/shared/src/main/scala/kyo/interop/flow/StreamSubscription.scala +++ b/kyo-reactive-streams/shared/src/main/scala/kyo/interop/flow/StreamSubscription.scala @@ -29,18 +29,25 @@ final private[kyo] class StreamSubscription[V, S]( private[interop] def subscribe(using Frame): Unit < Sync = Sync.defer(subscriber.onSubscribe(this)) private[interop] def poll(using Tag[Poll[Chunk[V]]], Frame): StreamComplete < (Async & Poll[Chunk[V]] & Abort[StreamCanceled]) = - def loopPoll(requesting: Long): (Chunk[V] | StreamComplete) < (Sync & Poll[Chunk[V]]) = - Loop[Long, Chunk[V] | StreamComplete, Sync & Poll[Chunk[V]]](requesting): requesting => - Poll.andMap: - case Present(values) => - if values.size <= requesting then - Sync.defer(values.foreach(subscriber.onNext(_))) - .andThen(Loop.continue(requesting - values.size)) - else - Sync.defer(values.take(requesting.intValue).foreach(subscriber.onNext(_))) - .andThen(Loop.done(values.drop(requesting.intValue))) - case Absent => - Sync.defer(Loop.done(StreamComplete)) + def loopPoll(requesting: Long): (Chunk[V] | StreamComplete) < (Sync & Poll[Chunk[V]] & Abort[StreamCanceled]) = + Loop[Long, Chunk[V] | StreamComplete, Sync & Poll[Chunk[V]] & Abort[StreamCanceled]](requesting): requesting => + // Stop draining the stream as soon as the subscription is cancelled. cancel() closes requestChannel; the outer + // take-loop already observes that, but a single loopPoll call holding a large outstanding demand would otherwise + // keep pulling and emitting the whole stream (calling onNext past cancellation) and leave this fiber running long + // after the subscriber is gone (a leaked, effectively-uninterruptible consumer). Signal StreamCanceled (not + // StreamComplete) so cancellation does not deliver a terminal onComplete, matching the outer take-loop's path. + if requestChannel.closed() then Abort.fail(StreamCanceled) + else + Poll.andMap: + case Present(values) => + if values.size <= requesting then + Sync.defer(values.foreach(subscriber.onNext(_))) + .andThen(Loop.continue(requesting - values.size)) + else + Sync.defer(values.take(requesting.intValue).foreach(subscriber.onNext(_))) + .andThen(Loop.done(values.drop(requesting.intValue))) + case Absent => + Sync.defer(Loop.done(StreamComplete)) Loop[Chunk[V], StreamComplete, Async & Poll[Chunk[V]] & Abort[StreamCanceled]](Chunk.empty[V]): leftOver => Abort.run[Closed](requestChannel.safe.take).map: diff --git a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/flow/CancellationTest.scala b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/flow/CancellationTest.scala index 3137355319..b765c37b6a 100644 --- a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/flow/CancellationTest.scala +++ b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/flow/CancellationTest.scala @@ -47,4 +47,44 @@ final class CancellationTest extends kyo.test.Test[Any]: sub.cancel() }.andThen(succeed("no subscriber callback fired after repeated cancel across all attempts")) } + + "a mid-drain cancel with large outstanding demand stops emission and never completes" in { + // Regression for StreamSubscription.loopPoll: with a large outstanding demand the drain loop must observe + // cancel() and stop, raising StreamCanceled, instead of pulling the rest of the stream (calling onNext past + // cancellation) and then delivering a terminal onComplete while leaving the consumer fiber running. The + // consumer fiber is awaited directly so the outcome is deterministic, not timing-dependent. + import AllowUnsafe.embrace.danger + val chunkSize = 16 + val total = 100000 + val bigStream = Stream.range(0, total, 1, chunkSize) + + val onNextCount = new java.util.concurrent.atomic.AtomicInteger(0) + val completedFlag = new java.util.concurrent.atomic.AtomicBoolean(false) + val capturedSub = new java.util.concurrent.atomic.AtomicReference[Subscription]() + val subscriber = new Subscriber[Int]: + def onSubscribe(s: Subscription): Unit = capturedSub.set(s) + def onNext(t: Int): Unit = + discard(onNextCount.incrementAndGet()) + // The first delivered element cancels mid-drain, while demand is still outstanding. + val s = capturedSub.get() + if s != null then s.cancel() + end onNext + def onComplete(): Unit = completedFlag.set(true) + def onError(e: Throwable): Unit = () + + for + sub <- Sync.Unsafe.defer(new StreamSubscription[Int, Any](bigStream, subscriber)) + _ <- sub.subscribe + fiber <- sub.consume + _ <- Sync.defer(sub.request(Long.MaxValue)) + result <- fiber.getResult + yield + assert(result.isFailure, s"consumer must end in StreamCanceled (a Failure), got: $result") + assert(!completedFlag.get(), "onComplete must not be delivered after a mid-drain cancel") + assert( + onNextCount.get() < total, + s"emission must stop after cancel, not drain all $total elements (emitted ${onNextCount.get()})" + ) + end for + } end CancellationTest diff --git a/kyo-tasty/jvm/src/test/scala/kyo/TastyPropertyClasspathScanTest.scala b/kyo-tasty/jvm/src/test/scala/kyo/TastyPropertyClasspathScanTest.scala index f379798bd2..b1bf445b3b 100644 --- a/kyo-tasty/jvm/src/test/scala/kyo/TastyPropertyClasspathScanTest.scala +++ b/kyo-tasty/jvm/src/test/scala/kyo/TastyPropertyClasspathScanTest.scala @@ -1,12 +1,7 @@ package kyo -import java.io.File -import java.nio.file.Files -import java.nio.file.Path -import java.nio.file.Paths import kyo.internal.TestClasspaths import kyo.internal.tasty.query.ClasspathOrchestrator -import scala.jdk.CollectionConverters.* /** Walks every kyo-* directory on the live JVM test classpath (via java.class.path) and decodes each as an independent single-root * Classpath, asserting zero UnknownTagInPosition errors across the full set. @@ -42,15 +37,19 @@ class TastyPropertyClasspathScanTest extends kyo.test.Test[Any]: */ private def discoverKyoClasspathRoots: List[String] = val accumulator = new scala.collection.mutable.LinkedHashSet[String]() - TestClasspaths.all.foreach { root => - val f = new File(root) - if f.exists && f.isDirectory then - Files.walk(f.toPath).iterator.asScala.foreach { p => - val s = p.toString + // Path.Unsafe.list collects each directory's entries and closes the stream before returning (no leaked fd); + // recurse to cover the full tree without following symlinks. + def recurse(dir: Path): Unit = + dir.unsafe.list().getOrThrow.foreach { entry => + if entry.unsafe.isDirectory() && !entry.unsafe.isSymbolicLink() then recurse(entry) + else + val s = entry.toString if s.endsWith(".tasty") && !s.contains("/test/") && !s.contains("/fixtures/") then - discard(accumulator += p.getParent.toString) - } - end if + entry.parent.foreach(parent => discard(accumulator += parent.toString)) + } + TestClasspaths.all.foreach { root => + val dir = Path(root) + if dir.unsafe.exists() && dir.unsafe.isDirectory() then recurse(dir) } accumulator.toList.sorted end discoverKyoClasspathRoots diff --git a/kyo-test/api/shared/src/main/scala/kyo/test/RunConfig.scala b/kyo-test/api/shared/src/main/scala/kyo/test/RunConfig.scala index 01133ba9af..ed3fcb73d2 100644 --- a/kyo-test/api/shared/src/main/scala/kyo/test/RunConfig.scala +++ b/kyo-test/api/shared/src/main/scala/kyo/test/RunConfig.scala @@ -1,5 +1,6 @@ package kyo.test +import kyo.Chunk import kyo.Duration import kyo.Maybe import kyo.minutes @@ -37,6 +38,30 @@ import kyo.minutes * how long a single leaf may run before the runner reports it as still running via `TestReporter.onLeafHeartbeat`, repeating every * interval thereafter. This makes a slow or hung leaf visible while it runs (the console reporter is silent between a leaf's start and * finish otherwise, so a hung leaf is invisible). `Duration.Infinity` disables heartbeats; defaults to 1 minute. + * @param leakCheck + * when `true` (the default), a forked test JVM runs end-of-run leak detection once all of its suites finish: it fails the run if a fiber is + * still running on the scheduler, a file descriptor opened during the run is still open, or a non-daemon thread the run started is still + * alive. Only active inside an sbt forked JVM (the one quiescent, isolated point); a no-op otherwise. This is the master switch; the four + * category toggles ([[leakCheckSockets]], [[leakCheckFileDescriptors]], [[leakCheckThreads]], [[leakCheckFibers]]) turn off one category + * while keeping the rest. Override per suite with `def config = super.config.leakCheck(false)` (all categories) or a single category toggle + * for a suite whose design legitimately holds one kind of resource for the whole run. + * @param leakCheckSockets + * when `true` (the default), socket descriptors are included in the file-descriptor probe along with files, directories, and pipes. A suite + * that drives a transport which defers a closed socket's fd release (so the fd briefly outlives the run) can turn off this one category via + * `super.config.leakCheckSockets(false)` while keeping file-descriptor, thread, and fiber detection on. + * @param leakCheckFileDescriptors + * when `true` (the default), non-socket descriptors (files, directories, pipes) are included in the file-descriptor probe. + * @param leakCheckThreads + * when `true` (the default), the non-daemon thread probe runs. + * @param leakCheckFibers + * when `true` (the default), the scheduler/fiber probe runs. + * @param leakCheckAllowlist + * substring patterns that excuse an expected long-lived resource from [[leakCheck]] without disabling the whole check. A fiber finding is + * excused if any pattern appears in the offending worker's full stack; a thread finding if any pattern appears in the thread's name or + * stack; a descriptor finding if any pattern appears in the descriptor's target (e.g. a file path). A socket's target is an opaque + * `socket:[inode]` with a per-run inode, so it has no stable substring to match; a suite whose fork leaves sockets open uses + * [[leakCheckSockets]]`(false)` instead. Prefer the allowlist over disabling when an expected resource is identifiable: the suite keeps + * detecting every other leak. * @see * `kyo.test.runner.TestRunner.runReport` which accepts a RunConfig as its second argument * @see @@ -57,7 +82,13 @@ final case class RunConfig( countOnly: Boolean = false, listOnly: Boolean = false, failOnNoAssertion: Boolean = true, - heartbeatInterval: Duration = 1.minutes + heartbeatInterval: Duration = 1.minutes, + leakCheck: Boolean = true, + leakCheckSockets: Boolean = true, + leakCheckFileDescriptors: Boolean = true, + leakCheckThreads: Boolean = true, + leakCheckFibers: Boolean = true, + leakCheckAllowlist: Chunk[String] = Chunk.empty ) derives CanEqual: /** Returns a copy with the given reporter installed. */ @@ -108,6 +139,35 @@ final case class RunConfig( */ def heartbeatInterval(heartbeatInterval: Duration): RunConfig = copy(heartbeatInterval = heartbeatInterval) + /** Returns a copy with end-of-run leak detection enabled or disabled. This is the master switch: when `false`, none of the category probes + * (sockets, file descriptors, threads, fibers) run for this suite. Prefer a single category toggle below, or [[leakCheckAllowlist]], over + * disabling everything when only a specific category or resource needs excusing. + */ + def leakCheck(leakCheck: Boolean): RunConfig = copy(leakCheck = leakCheck) + + /** Returns a copy with socket-descriptor leak detection enabled or disabled, leaving the other categories alone. Socket detection is on by + * default; disable it only for the specific suites that drive a transport which defers a closed socket's fd release, so every other + * category and every other suite stays checked. + */ + def leakCheckSockets(leakCheckSockets: Boolean): RunConfig = copy(leakCheckSockets = leakCheckSockets) + + /** Returns a copy with non-socket file-descriptor leak detection (files, directories, pipes) enabled or disabled, leaving the other + * categories on. + */ + def leakCheckFileDescriptors(leakCheckFileDescriptors: Boolean): RunConfig = copy(leakCheckFileDescriptors = leakCheckFileDescriptors) + + /** Returns a copy with non-daemon thread leak detection enabled or disabled, leaving the other categories on. */ + def leakCheckThreads(leakCheckThreads: Boolean): RunConfig = copy(leakCheckThreads = leakCheckThreads) + + /** Returns a copy with fiber (scheduler-still-busy) leak detection enabled or disabled, leaving the other categories on. */ + def leakCheckFibers(leakCheckFibers: Boolean): RunConfig = copy(leakCheckFibers = leakCheckFibers) + + /** Returns a copy with the given allowlist patterns ADDED to the existing ones (additive, so `super.config.leakCheckAllowlist(...)` + * accumulates). A fiber, thread, or descriptor leak whose stack, thread name, or descriptor target contains any pattern is excused from + * [[leakCheck]]. + */ + def leakCheckAllowlist(patterns: String*): RunConfig = copy(leakCheckAllowlist = leakCheckAllowlist ++ Chunk.from(patterns)) + end RunConfig object RunConfig: diff --git a/kyo-test/api/shared/src/main/scala/kyo/test/TestReport.scala b/kyo-test/api/shared/src/main/scala/kyo/test/TestReport.scala index 460812f4b0..e974d921de 100644 --- a/kyo-test/api/shared/src/main/scala/kyo/test/TestReport.scala +++ b/kyo-test/api/shared/src/main/scala/kyo/test/TestReport.scala @@ -116,6 +116,19 @@ end TestReport * all leaf results for this suite, in execution order * @param duration * wall-clock time from suite start to suite complete + * @param leakCheck + * the suite's effective `RunConfig.leakCheck` master setting, carried here so the run-level end-of-run leak check (performed once per forked + * JVM after all suites finish) can honor each suite's override + * @param leakCheckSockets + * the suite's effective `RunConfig.leakCheckSockets` setting, aggregated across the fork's suites to gate the socket-descriptor probe + * @param leakCheckFileDescriptors + * the suite's effective `RunConfig.leakCheckFileDescriptors` setting, aggregated to gate the non-socket descriptor probe + * @param leakCheckThreads + * the suite's effective `RunConfig.leakCheckThreads` setting, aggregated to gate the thread probe + * @param leakCheckFibers + * the suite's effective `RunConfig.leakCheckFibers` setting, aggregated to gate the fiber probe + * @param leakCheckAllowlist + * the suite's effective `RunConfig.leakCheckAllowlist`, unioned across the fork's suites by the leak check * @see * [[kyo.test.TestReport]] which collects multiple SuiteReports into the run-level aggregate * @see @@ -128,7 +141,13 @@ end TestReport final case class SuiteReport( name: String, leafResults: Chunk[(Chunk[String], TestResult)], - duration: Duration + duration: Duration, + leakCheck: Boolean = true, + leakCheckSockets: Boolean = true, + leakCheckFileDescriptors: Boolean = true, + leakCheckThreads: Boolean = true, + leakCheckFibers: Boolean = true, + leakCheckAllowlist: Chunk[String] = Chunk.empty ) derives CanEqual /** Static description of a leaf test, passed to reporters before and after execution. diff --git a/kyo-test/runner/jvm/src/main/scala/kyo/test/runner/internal/LeakCheck.scala b/kyo-test/runner/jvm/src/main/scala/kyo/test/runner/internal/LeakCheck.scala new file mode 100644 index 0000000000..23f3340cae --- /dev/null +++ b/kyo-test/runner/jvm/src/main/scala/kyo/test/runner/internal/LeakCheck.scala @@ -0,0 +1,374 @@ +package kyo.test.runner.internal + +import java.nio.file.Files +import java.nio.file.Paths +import java.util.concurrent.locks.LockSupport +import kyo.Chunk +import kyo.Maybe +import kyo.scheduler.Scheduler +import scala.jdk.CollectionConverters.* + +/** JVM end-of-run leak probes, read at the sbt `done()` boundary after every suite in the fork has finished. + * + * Three process-global resources are sampled, never per-leaf: the open file descriptors, the kyo scheduler, and the JVM's non-daemon + * threads. At `done()` the fork is quiescent (all leaves joined), so a descriptor still open, a fiber still runnable, or a non-daemon thread + * still alive is one a leaf failed to release. The probes only read existing surfaces (`/proc/self/fd`, `Scheduler.get`, + * `Thread.getAllStackTraces`); nothing in the scheduler or core changes. + * + * Detectable: descriptor leaks (a socket, pipe, or file open at `done()` that was not open at construction and is not a classpath jar or JVM + * internal; identified precisely by enumerating and reading the `/proc/self/fd` symlinks, so there is no count tolerance); runnable/spinning + * fiber leaks (a fiber pegging or repeatedly rescheduling onto a worker, the class the async-merge spinning-producer bug produced); and + * non-daemon thread leaks (a raw `Thread` or un-shutdown executor that keeps the JVM from exiting cleanly; the scheduler's own threads are + * daemons, so they never trip this). Not detectable here: a fiber parked on a still-reachable promise/channel is off-scheduler and invisible + * to scheduler status; catching that would need a core registry. The descriptor probe is Linux-only (`/proc/self/fd`); a no-op elsewhere. + */ +private[runner] object LeakCheck: + + /** Built-in allowlist patterns applied by [[detect]] in addition to each suite's `RunConfig.leakCheckAllowlist`, for process-lifetime infra + * that legitimately outlives every test in the fork. + * + * `NioIoDriver` is allowlisted pending the network stack rewrite. kyo-http's process-wide IO transport (`HttpPlatformTransport.transport`) + * is a lazy singleton whose `NioIoDriver` runs a selector event loop on a scheduler fiber for the JVM's lifetime, and nothing ever closes + * the shared transport, so the fiber is parked in `select()` at every http-using module's end-of-run check. It is intentional infra, not a + * leak; excusing it here covers every http-touching test module in one place. The entry is removed once the network stack gives the driver + * a proper lifecycle (or moves its loop off the scheduler). + */ + val defaultAllowlist: Chunk[String] = Chunk("NioIoDriver") + + /** The set of open file descriptors, each as its `/proc/self/fd` symlink target (`socket:[inode]`, `pipe:[inode]`, a file path, a `.jar`, + * ...). `Absent` on a platform without `/proc/self/fd` (macOS, Windows), where the descriptor probe is a no-op. The descriptor that the + * enumeration itself opens (the directory stream) targets `/proc/.../fd` and is filtered by [[benignFd]], so it never reads as a leak. + */ + def openFdTargets(): Maybe[Set[String]] = + val dir = Paths.get("/proc/self/fd") + if !Files.isDirectory(dir) then Maybe.empty + else + val targets = Set.newBuilder[String] + val stream = Files.newDirectoryStream(dir) + try + stream.forEach { entry => + val target = + try Files.readSymbolicLink(entry).toString + catch case _: Throwable => "" // the fd closed between listing and readlink; ignore + targets += target + } + finally stream.close() + end try + Maybe(targets.result()) + end if + end openFdTargets + + /** True for a descriptor target that is legitimately open for the JVM's lifetime regardless of any test: a classpath jar, a native + * library, a device or proc/sys pseudo-file, a JVM-internal anonymous inode (epoll, eventfd), or the runtime image. These are excluded + * from the descriptor diff so that lazy classloading (which opens jar handles as suites load classes) is never reported as a leak. + */ + def benignFd(target: String): Boolean = + target.endsWith(".jar") || target.contains(".so") || + target.startsWith("/dev/") || target.startsWith("/proc/") || target.startsWith("/sys/") || + target.startsWith("anon_inode:") || target.startsWith("/modules/") || target == "" + + /** Average scheduler load across active workers: queued plus executing tasks per worker. `0.0` when fully idle. */ + def loadAvg(): Double = Scheduler.get.loadAvg() + + /** Snapshot of currently-live non-daemon threads, by identity. Captured as a baseline at runner construction (so the JVM's own infra + * threads ; `main`, the sbt ForkMain reader ; are excluded), then diffed at `done()`. + */ + def liveNonDaemonThreads(): Set[Thread] = + Thread.getAllStackTraces.keySet.asScala.iterator.filter(t => t.isAlive && !t.isDaemon).toSet + + // sbt runs each suite task on a thread from its own ForkMain executor (`pool-N-thread-M`), which is non-daemon and stays + // parked between tasks; at `done()` those idle harness threads would look identical to a leaked test thread. They are + // identified structurally rather than by name: every `execute()` runs ON one of them, so registering the carrier thread at + // the top of each suite execution records exactly sbt's pool, with no pattern that a real test thread pool could collide + // with. Process-global because there is one fork JVM; the caller only registers when leak detection is active (forked), so + // it never accumulates in the long-lived main sbt JVM. + private val carrierThreads: java.util.Set[Thread] = + java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap[Thread, java.lang.Boolean]()) + + /** Records the calling thread as sbt harness infrastructure, excluding it from [[leakedNonDaemonThreads]]. Called at the top of each + * suite execution, where the calling thread is the sbt ForkMain pool thread carrying that task. + */ + def registerCarrierThread(): Unit = + carrierThreads.add(Thread.currentThread): Unit + + /** Live non-daemon threads not present in `baseline`, not a registered sbt carrier thread, not the calling thread, and not allowlisted: + * threads a test started and left running, which a raw `Thread` or an un-shutdown executor produces and which block a clean JVM exit. A + * thread is allowlisted if any pattern appears in its name or any of its stack frames. Each entry is the thread name plus its top frame. + */ + def leakedNonDaemonThreads(baseline: Set[Thread], allowlist: Chunk[String]): Chunk[String] = + val self = Thread.currentThread + val out = Chunk.newBuilder[String] + Thread.getAllStackTraces.asScala.foreach { case (t, st) => + if t.isAlive && !t.isDaemon && (t ne self) && !baseline.contains(t) && !carrierThreads.contains(t) then + val allowlisted = allowlist.exists(p => t.getName.contains(p) || st.exists(_.toString.contains(p))) + if !allowlisted then + // Report the thread's state and full stack, not just the top frame: a leaked non-daemon thread blocks a clean + // JVM exit, and the stack (what it is parked on or looping in) is what a CI reader needs to trace it back to the + // test that started it. The top frame alone is usually an opaque park/wait. + val stack = if st.nonEmpty then st.iterator.take(30).map(f => s" at $f").mkString("\n") else " " + out += s"${t.getName} (${t.getState})\n$stack" + end if + } + out.result() + end leakedNonDaemonThreads + + /** A stack frame of a worker that currently holds work (`load > 0`), if any: identifies where a still-running fiber is executing. Reads + * the full status (captures worker stack traces), so call it only when reporting, not in a tight poll. + */ + def busyWorkerFrame(): Maybe[String] = + val workers = Scheduler.get.status().workers + var i = 0 + var res: Maybe[String] = Maybe.empty + while i < workers.length && res.isEmpty do + val w = workers(i) + if (w ne null) && w.load > 0 && (w.frame ne null) then res = Maybe(w.frame) + i += 1 + end while + res + end busyWorkerFrame + + /** The full stack of a worker that currently holds work (`load > 0`), joined into one string, for allowlist matching. The scheduler's + * `WorkerStatus.frame` is only the top frame, and the top frame of a blocked fiber is an OS-specific syscall (`EPoll.wait` on Linux, + * `KQueue` on macOS); a allowlist needs a stable kyo frame deeper in the stack, so this reads the worker's mount thread's full stack via + * `Thread.getAllStackTraces` (keyed by the mount thread name in the status). `Absent` when no worker is busy. + */ + def busyWorkerStack(): Maybe[String] = + val workers = Scheduler.get.status().workers + var i = 0 + var mount: Maybe[String] = Maybe.empty + while i < workers.length && mount.isEmpty do + val w = workers(i) + if (w ne null) && w.load > 0 && (w.mount ne null) && w.mount.nonEmpty then mount = Maybe(w.mount) + i += 1 + end while + mount.flatMap { name => + var res: Maybe[String] = Maybe.empty + Thread.getAllStackTraces.asScala.foreach { case (t, st) => + if res.isEmpty && t.getName == name then res = Maybe(st.mkString("\n")) + } + res + } + end busyWorkerStack + + /** A thread dump of every thread that is actually doing something at probe time, for an actionable fiber-leak report: each thread whose + * state is `RUNNABLE` or whose stack runs kyo code, with its name, state, and stack. Unlike [[busyWorkerStack]] (which sees only scheduler + * workers) this also captures NON-worker threads, e.g. a caller stuck mid-`offer` that holds a queue's race-repair counter while a worker + * spins in `close()` waiting for it. Idle pool/parked threads with no kyo frame are filtered out to keep the report focused. The leak + * check's own thread is excluded. + */ + def runningThreadsDump(): String = + val self = Thread.currentThread() + val sb = new StringBuilder + Thread.getAllStackTraces.asScala.toList + .filter { (t, st) => + (t ne self) && st.nonEmpty && + ((t.getState eq Thread.State.RUNNABLE) || st.exists(_.getClassName.startsWith("kyo."))) + } + .sortBy((t, _) => t.getName) + .foreach { (t, st) => + sb.append(s"\n \"${t.getName}\" ${t.getState}\n") + st.iterator.take(30).foreach(f => sb.append(s" at $f\n")) + } + sb.toString + end runningThreadsDump + + /** Outcome of [[awaitSchedulerIdle]]. */ + enum IdleResult derives CanEqual: + case Idle + case Busy(loadAvg: Double, frame: Maybe[String]) + + /** Polls the scheduler until its load has been `0.0` continuously for `settleNanos`, or until `budgetNanos` elapses. + * + * The settle window lets transient tail activity (a reporter fiber, a finalizer) drain before a verdict, so only work that persists past + * the budget is reported as `Busy`. Blocking by design: called at the sbt `done()` boundary, outside any fiber, so parking the caller is + * correct here rather than an `Async` suspension. + */ + def awaitSchedulerIdle(budgetNanos: Long, settleNanos: Long, pollNanos: Long): IdleResult = + val deadline = System.nanoTime() + budgetNanos + var idleSince: Long = -1L + var result: Maybe[IdleResult] = Maybe.empty + while result.isEmpty && System.nanoTime() < deadline do + val now = System.nanoTime() + if loadAvg() == 0.0 then + if idleSince < 0 then idleSince = now + else if now - idleSince >= settleNanos then result = Maybe(IdleResult.Idle) + else idleSince = -1L + end if + if result.isEmpty then LockSupport.parkNanos(pollNanos) + end while + result.getOrElse { + if loadAvg() == 0.0 then IdleResult.Idle + else IdleResult.Busy(loadAvg(), busyWorkerFrame()) + } + end awaitSchedulerIdle + + /** True when running inside an sbt forked test JVM (`sbt.ForkMain`), the only JVM where end-of-run leak detection is both sound (the fork + * holds only this run's resources) and safe to fail by exit (failing the main sbt JVM would take sbt down). Two agreeing signals: the + * `sun.java.command` property and an `sbt.ForkMain` frame on the `main` thread. Defaults to `false` on any ambiguity, because the verdict + * gates an irreversible JVM-failing action. + */ + def isForked: Boolean = + Option(System.getProperty("sun.java.command")).exists(_.startsWith("sbt.ForkMain")) || + Thread.getAllStackTraces.asScala.exists { (t, st) => + (t.getName == "main") && st.exists(_.getClassName.startsWith("sbt.ForkMain")) + } + + /** Descriptor targets open at `done()` that were not open at construction and are neither benign ([[benignFd]]) nor allowlisted: a socket, + * pipe, or file a leaf opened and never closed. Diffing against the baseline excludes the fork's own startup descriptors, like the + * `sbt.ForkMain` socket back to the main JVM, which is open the whole run. + */ + def fdLeaks(baseline: Set[String], current: Set[String], allowlist: Chunk[String]): Chunk[String] = + val out = Chunk.newBuilder[String] + current.foreach { target => + if !baseline.contains(target) && !benignFd(target) && !allowlist.exists(target.contains) then + out += target + } + out.result() + end fdLeaks + + /** Restricts descriptor leaks to the enabled descriptor categories: a socket target (`socket:[inode]`) is kept only when `checkSockets` is + * on, every other target (files, directories, pipes) only when `checkFileDescriptors` is on. Lets a suite exempt the socket category while + * still detecting file-descriptor leaks (e.g. an unclosed `Files.list` directory stream). + */ + def fdLeaksForCategories(leaks: Chunk[String], checkSockets: Boolean, checkFileDescriptors: Boolean): Chunk[String] = + leaks.filter(target => if target.startsWith("socket:[") then checkSockets else checkFileDescriptors) + + private val tcpStates = Map( + "01" -> "ESTABLISHED", + "02" -> "SYN_SENT", + "03" -> "SYN_RECV", + "04" -> "FIN_WAIT1", + "05" -> "FIN_WAIT2", + "06" -> "TIME_WAIT", + "07" -> "CLOSE", + "08" -> "CLOSE_WAIT", + "09" -> "LAST_ACK", + "0A" -> "LISTEN", + "0B" -> "CLOSING" + ) + + /** For a `socket:[inode]` target, resolves the connection's TCP state and local/remote ports from `/proc/net/tcp{,6}`, so a leaked socket + * is actionable rather than an opaque inode: e.g. `CLOSE_WAIT` means the peer closed and this side held the connection open, and the ports + * say which side it is (an ephemeral local port to a server's remote port is a client connection). Returns "" for a non-socket target or an + * inode that cannot be resolved. + */ + def describeSocket(target: String): String = + if !target.startsWith("socket:[") then "" + else + val inode = target.stripPrefix("socket:[").stripSuffix("]") + def scan(path: String): Maybe[String] = + try + val lines = java.nio.file.Files.readAllLines(Paths.get(path)).asScala + var res: Maybe[String] = Maybe.empty + lines.foreach { line => + val f = line.trim.split("\\s+") + // columns: sl local rem st ... inode (index 9); the header row has no numeric inode at f(9) + if res.isEmpty && f.length > 9 && f(9) == inode then + val st = tcpStates.getOrElse(f(3).toUpperCase, f(3)) + val lp = Integer.parseInt(f(1).split(":")(1), 16) + val rp = Integer.parseInt(f(2).split(":")(1), 16) + res = Maybe(s" [$st local:$lp remote:$rp]") + end if + } + res + catch case _: Throwable => Maybe.empty + scan("/proc/net/tcp").orElse(scan("/proc/net/tcp6")).getOrElse("") + end describeSocket + + /** Process-global resource snapshot taken once at runner construction, before any suite runs, and diffed at `done()`. Captures the open + * descriptor targets and the set of live non-daemon threads so the JVM's own startup infrastructure (the `main` thread, the ForkMain + * reader and socket) is excluded from the diff. + */ + final case class Baseline(fds: Maybe[Set[String]], threads: Set[Thread]) + + /** Captures a [[Baseline]] of the current open descriptors and live non-daemon threads. */ + def baseline(): Baseline = Baseline(openFdTargets(), liveNonDaemonThreads()) + + /** Runs the enabled end-of-run probes against `baseline`, excusing any finding matched by `allowlist`, and returns a leak report or `Absent` + * when the fork is clean. The four `check*` flags gate the categories independently (a suite can exempt just sockets, say, and still detect + * file-descriptor, thread, and fiber leaks); the scheduler settle still runs whenever any category is enabled. + * + * Order: the scheduler/fiber probe first (it owns the settle window), then a `System.gc()` plus settle so Cleaner-closed abandoned + * channels and finished threads drop out before the descriptor and thread diffs (a genuine leak stays referenced and survives the gc, so + * this trims false positives without hiding real leaks). The fiber probe matches the allowlist against the busy worker's full stack so an + * OS-independent kyo frame can excuse an expected event loop; the descriptor probe enumerates `/proc/self/fd` and reports the exact + * leaked targets with no count tolerance. + */ + def detect( + baseline: Baseline, + allowlist: Chunk[String], + checkFibers: Boolean, + checkThreads: Boolean, + checkFileDescriptors: Boolean, + checkSockets: Boolean, + idleBudgetNanos: Long, + settleNanos: Long, + pollNanos: Long + ): Maybe[String] = + val findings = Chunk.newBuilder[String] + val effectiveAllowlist = defaultAllowlist ++ allowlist + + // Always settle on scheduler quiescence first: it lets in-flight fibers finish and release their resources before the thread and + // descriptor diffs run, which trims false positives for every category. Record a fiber finding only when that category is enabled. + awaitSchedulerIdle(idleBudgetNanos, settleNanos, pollNanos) match + case IdleResult.Idle => () + case IdleResult.Busy(la, frame) => + if checkFibers then + val stack = busyWorkerStack().getOrElse(frame.getOrElse("")) + val allowlisted = effectiveAllowlist.exists(stack.contains) + if !allowlisted then + findings += s"fiber leak: scheduler still busy (loadAvg=$la) after settle; running at ${frame.getOrElse("")}" + + s"\n busy worker stack:\n$stack" + + s"\n all running threads (worker and non-worker) at probe time:${runningThreadsDump()}" + end if + end match + + System.gc() + LockSupport.parkNanos(settleNanos) + + if checkThreads then + val threadLeaks = leakedNonDaemonThreads(baseline.threads, effectiveAllowlist) + if threadLeaks.nonEmpty then + findings += s"non-daemon thread leak (${threadLeaks.size}): ${threadLeaks.mkString("; ")}" + end if + + if checkFileDescriptors || checkSockets then + baseline.fds match + case Maybe.Present(before) => + // A descriptor may be mid-close at done(): a client connection closes asynchronously while it processes the + // server's FIN (EOF -> pump teardown -> channel close). Require a descriptor to remain leaked across a second + // settle so an in-flight close is not mistaken for a leak; a genuinely leaked descriptor never closes and so + // survives the recheck. (Safe: this can only drop descriptors that closed during the window, never a real leak.) + def leaksNow(): Chunk[String] = + val raw = openFdTargets().map(fdLeaks(before, _, effectiveAllowlist)).getOrElse(Chunk.empty) + fdLeaksForCategories(raw, checkSockets, checkFileDescriptors) + val first = leaksNow() + if first.nonEmpty then + LockSupport.parkNanos(settleNanos) + val second = leaksNow() + val persistent = first.filter(second.contains) + if persistent.nonEmpty then + val described = persistent.map(t => t + describeSocket(t)) + findings += s"file-descriptor leak (${persistent.size}): ${described.mkString("; ")}" + end if + case Maybe.Absent => () // /proc/self/fd unavailable: descriptor probe is a no-op on this platform. + end match + end if + + val all = findings.result() + if all.isEmpty then Maybe.empty + else Maybe(all.mkString("\n - ", "\n - ", "")) + end detect + + /** Thrown from the forked runner's `done()` when [[detect]] finds a leak. Failing by exception is what marks the forked test task failed; + * sbt surfaces the message as a `ForkMain$ForkError`. + */ + final class Detected(report: String) + extends RuntimeException( + s"kyo-test leak check failed:$report\n\nThese resources outlived the test run; a leaked fiber, thread, or descriptor means a " + + "test (or the code under test) did not release a resource. Disable for a suite with " + + "`override def config = super.config.leakCheck(false)`, or excuse one expected resource with " + + "`super.config.leakCheckAllowlist(\"\")`." + ) + +end LeakCheck diff --git a/kyo-test/runner/jvm/src/main/scala/kyo/test/runner/internal/SbtRunner.scala b/kyo-test/runner/jvm/src/main/scala/kyo/test/runner/internal/SbtRunner.scala index 6397e87eb1..3250d2173a 100644 --- a/kyo-test/runner/jvm/src/main/scala/kyo/test/runner/internal/SbtRunner.scala +++ b/kyo-test/runner/jvm/src/main/scala/kyo/test/runner/internal/SbtRunner.scala @@ -57,6 +57,16 @@ final private[runner] class SbtRunner( private val results = new java.util.concurrent.ConcurrentLinkedQueue[TestReport]() + // End-of-run leak detection runs once per forked test JVM, the one place the probe is both sound (the fork holds only this + // run's resources) and safe to fail by exit. Enablement and the allowlist are per-suite RunConfig (default on), carried on + // each SuiteReport and aggregated at done(); the fork check is resolved once here (cheap: `sun.java.command` is set at JVM + // launch). The baseline is captured now, in the constructor, before any suite runs, so the diff at done() excludes the JVM's + // own startup descriptors and threads (including the sbt.ForkMain socket). In the main sbt JVM `forked` is false: no + // baseline, no carrier tracking, no check (the diff would be polluted by sbt's own resources and a throw would fail sbt). + private val forked = LeakCheck.isForked + private val leakBaseline = if forked then LeakCheck.baseline() else LeakCheck.Baseline(kyo.Maybe.empty, Set.empty) + private val leakCheckRan = new java.util.concurrent.atomic.AtomicBoolean(false) + // Populated on first tasks() invocation. SuiteDiscovery scans the META-INF/services file // and surfaces classloader / non-TestBase failures so they end up in Summary's warning line. private[runner] val discoveryErrors: AtomicReference[Chunk[String]] = @@ -66,17 +76,55 @@ final private[runner] class SbtRunner( parsedArgs match case Args.Result.Ok(_) => discoveryErrors.set(SuiteDiscovery.discoverDetailed(testClassLoader).errors) - taskDefs.map(td => new SbtTask(td, baseConfig, testClassLoader, results)) + taskDefs.map(td => new SbtTask(td, baseConfig, testClassLoader, results, forked)) case _ => Array.empty def done(): String = + runLeakCheck() parsedArgs match case Args.Result.Error(msg) => msg case Args.Result.Help => "" case Args.Result.Ok(_) => import scala.jdk.CollectionConverters.* Summary.render(results.asScala, discoveryErrors.get(), positionalArgs) + end match end done + /** Runs the end-of-run leak probes once, only inside a forked test JVM, and throws [[LeakCheck.Detected]] on a leak so sbt fails the test + * task. The leak settings are aggregated from the suites that ran in this fork (each [[TestReport]] carries its suite's effective + * `leakCheck` and `leakCheckAllowlist`): the check runs if any suite enabled it, against the union of their allowlists. sbt calls `done()` + * more than once per forked runner (once after execution, once from a shutdown hook), so the compare-and-set guard fires the probes and + * any failure exactly once. Outside a fork (the main sbt JVM) `forked` is false, so this is a no-op. + */ + private def runLeakCheck(): Unit = + if forked && leakCheckRan.compareAndSet(false, true) then + import scala.jdk.CollectionConverters.* + val suites = results.asScala.flatMap(_.suiteReports) + val allowlist = Chunk.from(suites.flatMap(_.leakCheckAllowlist)).distinct + // Each category runs if any suite in the fork enabled it (master on AND that category on); a suite exempts a category by + // turning just that one off, so the fork keeps detecting the rest. To exempt a category fork-wide, every suite must opt out, + // which is why the per-category toggles live on the shared suite base (e.g. BaseHttpTest disables only sockets). + val checkFibers = suites.exists(s => s.leakCheck && s.leakCheckFibers) + val checkThreads = suites.exists(s => s.leakCheck && s.leakCheckThreads) + val checkFileDescriptors = suites.exists(s => s.leakCheck && s.leakCheckFileDescriptors) + val checkSockets = suites.exists(s => s.leakCheck && s.leakCheckSockets) + if checkFibers || checkThreads || checkFileDescriptors || checkSockets then + LeakCheck.detect( + leakBaseline, + allowlist = allowlist, + checkFibers = checkFibers, + checkThreads = checkThreads, + checkFileDescriptors = checkFileDescriptors, + checkSockets = checkSockets, + idleBudgetNanos = 2_000_000_000L, + settleNanos = 200_000_000L, + pollNanos = 10_000_000L + ) match + case kyo.Maybe.Present(report) => throw new LeakCheck.Detected(report) + case kyo.Maybe.Absent => () + end match + end if + end runLeakCheck + end SbtRunner diff --git a/kyo-test/runner/jvm/src/main/scala/kyo/test/runner/internal/SbtTask.scala b/kyo-test/runner/jvm/src/main/scala/kyo/test/runner/internal/SbtTask.scala index 6d66dd5da4..cde9d8e9a0 100644 --- a/kyo-test/runner/jvm/src/main/scala/kyo/test/runner/internal/SbtTask.scala +++ b/kyo-test/runner/jvm/src/main/scala/kyo/test/runner/internal/SbtTask.scala @@ -27,7 +27,8 @@ final private[internal] class SbtTask( val taskDef: TaskDef, baseConfig: RunConfig, testClassLoader: ClassLoader, - results: java.util.concurrent.ConcurrentLinkedQueue[TestReport] + results: java.util.concurrent.ConcurrentLinkedQueue[TestReport], + forked: Boolean ) extends Task: def tags(): Array[String] = Array.empty @@ -36,6 +37,9 @@ final private[internal] class SbtTask( eventHandler: EventHandler, loggers: Array[Logger] ): Array[Task] = + // This call runs on the sbt ForkMain pool thread carrying the task; record it as harness infrastructure so the + // end-of-run thread probe never mistakes a parked sbt worker for a leaked test thread (see LeakCheck.registerCarrierThread). + if forked then LeakCheck.registerCarrierThread() val report = runSuite() results.add(report) emitEvents(report, eventHandler) diff --git a/kyo-test/runner/jvm/src/test/scala/kyo/test/runner/LeakCheckTest.scala b/kyo-test/runner/jvm/src/test/scala/kyo/test/runner/LeakCheckTest.scala new file mode 100644 index 0000000000..24e5af2f13 --- /dev/null +++ b/kyo-test/runner/jvm/src/test/scala/kyo/test/runner/LeakCheckTest.scala @@ -0,0 +1,180 @@ +package kyo.test.runner + +import java.nio.channels.FileChannel +import java.nio.file.StandardOpenOption +import java.util.concurrent.atomic.AtomicBoolean +import kyo.* +import kyo.test.runner.internal.LeakCheck +import org.scalatest.NonImplicitAssertions +import org.scalatest.funsuite.AnyFunSuite + +/** Validates the three end-of-run leak probes: the descriptor diff logic ([[LeakCheck.fdLeaks]] / [[LeakCheck.benignFd]], pure so it runs on + * every platform) plus a Linux-only `/proc/self/fd` enumeration check, a deliberately leaked spinning fiber, and a deliberately leaked + * non-daemon thread, confirming each signal clears after cleanup. Plain ScalaTest (not self-hosted via kyo-test) so the leaks can be driven + * and torn down from outside any fiber. + */ +class LeakCheckTest extends AnyFunSuite with NonImplicitAssertions: + + private def minLoad(samples: Int): Double = + var min = Double.MaxValue + var i = 0 + while i < samples do + Thread.sleep(20) + val l = LeakCheck.loadAvg() + if l < min then min = l + i += 1 + end while + min + end minLoad + + /** Polls `cond` every 10ms up to `timeoutMs`, returning whether it became true. Used to wait for a fiber to reach an observable scheduler + * state instead of guessing with a fixed sleep, so the scheduler probe test does not race fiber startup. + */ + private def awaitTrue(timeoutMs: Long)(cond: => Boolean): Boolean = + val deadline = java.lang.System.nanoTime() + timeoutMs * 1_000_000L + var ok = cond + while !ok && java.lang.System.nanoTime() < deadline do + Thread.sleep(10) + ok = cond + ok + end awaitTrue + + test("benignFd excludes classpath/library/JVM-internal targets, keeps sockets/pipes/files") { + assert(LeakCheck.benignFd("/home/u/.ivy2/cache/io.getkyo/kyo-core.jar")) + assert(LeakCheck.benignFd("/usr/lib/x86_64-linux-gnu/libc.so.6")) + assert(LeakCheck.benignFd("/dev/urandom")) + assert(LeakCheck.benignFd("/proc/self/fd")) + assert(LeakCheck.benignFd("anon_inode:[eventpoll]")) + assert(!LeakCheck.benignFd("socket:[5]")) + assert(!LeakCheck.benignFd("pipe:[7]")) + assert(!LeakCheck.benignFd("/tmp/data.txt")) + } + + test("fdLeaks reports only new, non-benign, non-allowlisted descriptors") { + val baseline = Set("socket:[1]", "/app/lib/foo.jar", "pipe:[2]") + val current = Set( + "socket:[1]", // in baseline -> not a leak (e.g. the sbt.ForkMain socket) + "/app/lib/foo.jar", // baseline jar + "/app/lib/new.jar", // new but benign (classloader jar) + "anon_inode:[eventpoll]", // new but benign (JVM epoll) + "/dev/random", // new but benign + "socket:[99]", // NEW socket -> leak + "/tmp/leaked.txt", // NEW file -> leak + "/tmp/excused.txt" // NEW file but allowlisted + ) + val leaks = LeakCheck.fdLeaks(baseline, current, Chunk("excused")) + assert(leaks.toSet == Set("socket:[99]", "/tmp/leaked.txt"), s"got $leaks") + } + + test("fdLeaksForCategories keeps only the enabled descriptor categories") { + val leaks = Chunk("socket:[99]", "/tmp/leaked.txt", "pipe:[7]") + // both categories on: every leak kept + assert(LeakCheck.fdLeaksForCategories(leaks, checkSockets = true, checkFileDescriptors = true).toSet == leaks.toSet) + // sockets off (e.g. BaseHttpTest): socket dropped, non-socket descriptors still reported + assert( + LeakCheck.fdLeaksForCategories(leaks, checkSockets = false, checkFileDescriptors = true).toSet == + Set("/tmp/leaked.txt", "pipe:[7]") + ) + // file descriptors off: only the socket remains + assert(LeakCheck.fdLeaksForCategories(leaks, checkSockets = true, checkFileDescriptors = false).toSet == Set("socket:[99]")) + // both off: nothing reported + assert(LeakCheck.fdLeaksForCategories(leaks, checkSockets = false, checkFileDescriptors = false).isEmpty) + } + + test("openFdTargets enumerates real descriptors and the diff clears on close (Linux only)") { + LeakCheck.openFdTargets() match + case Maybe.Absent => + cancel("/proc/self/fd unavailable on this OS; descriptor probe is a no-op here") + case Maybe.Present(before) => + val tmp = java.io.File.createTempFile("kyo-leak-fd", ".tmp") + tmp.deleteOnExit() + val name = tmp.getName + val ch = FileChannel.open(tmp.toPath, StandardOpenOption.READ) + try + val after = LeakCheck.openFdTargets().getOrElse(fail("openFdTargets became Absent mid-test")) + val leaks = LeakCheck.fdLeaks(before, after, Chunk.empty) + assert(leaks.exists(_.contains(name)), s"the open temp-file descriptor should be reported as a leak; got $leaks") + finally ch.close() + end try + val restored = LeakCheck.openFdTargets().getOrElse(fail("openFdTargets became Absent mid-test")) + assert( + !LeakCheck.fdLeaks(before, restored, Chunk.empty).exists(_.contains(name)), + "after close the temp-file descriptor must no longer be reported" + ) + end match + } + + test("scheduler probe reports Busy with a frame while a fiber spins, and drains after cleanup") { + import kyo.AllowUnsafe.embrace.danger + val stop = new AtomicBoolean(false) + // A single CPU-bound thunk that pegs one worker thread until the flag flips: it allocates nothing (unlike a tight + // kyo Loop, which mints a Loop.Outcome per iteration and exhausts the fork's heap), modelling the spinning-producer + // leak's effect on the scheduler (a worker stuck at 100% with currentTask set, so load stays >= 1). + val spinner: Unit < Sync = + Sync.defer { + var x = 0L + while !stop.get() do x += 1 + if x < 0 then throw new AssertionError() // keep the loop live (x is read) + } + val ambient = minLoad(8) + val fiber = Sync.Unsafe.evalOrThrow(Fiber.initUnscoped(spinner)) + // Wait until the spinner is actually mounted and observed as busy load, instead of guessing with a fixed sleep + // (which races startup). At a real done() a leaked spinner has been running since before the check. + val observed = awaitTrue(2000)(LeakCheck.busyWorkerFrame().isDefined) + val verdict = LeakCheck.awaitSchedulerIdle( + budgetNanos = 300_000_000L, + settleNanos = 150_000_000L, + pollNanos = 10_000_000L + ) + // Tear the spinner down BEFORE asserting so a failed assertion never leaves it pegging a worker. + stop.set(true) + val _ = Sync.Unsafe.evalOrThrow(fiber.interrupt) + Thread.sleep(200) + val drained = minLoad(10) + + assert(observed, s"the spinner should be observed as busy load within 2s (ambient=$ambient)") + verdict match + case LeakCheck.IdleResult.Busy(la, frame) => + assert(la > 0.0, s"a spinning fiber should keep load above zero (ambient=$ambient)") + assert(frame.isDefined, "a busy worker should surface a stack frame for the spinning fiber") + case LeakCheck.IdleResult.Idle => + fail(s"expected Busy while a fiber was spinning, got Idle (ambient=$ambient)") + end match + assert(drained <= ambient + 0.5, s"after cleanup the spinner's load should be gone (ambient=$ambient drained=$drained)") + } + + test("non-daemon thread probe detects a leaked thread, respects the allowlist, and clears after join") { + val baseline = LeakCheck.liveNonDaemonThreads() + assert( + !LeakCheck.leakedNonDaemonThreads(baseline, Chunk.empty).exists(_.contains("leak-probe-thread")), + "baseline must not already contain the probe thread" + ) + val leaked = new Thread( + () => + try Thread.sleep(60000) + catch case _: InterruptedException => (), + "leak-probe-thread" + ) + leaked.setDaemon(false) + leaked.start() + try + assert( + awaitTrue(2000)(LeakCheck.leakedNonDaemonThreads(baseline, Chunk.empty).exists(_.contains("leak-probe-thread"))), + "a live non-daemon thread started after the baseline should be flagged" + ) + assert( + !LeakCheck.leakedNonDaemonThreads(baseline, Chunk("leak-probe-thread")).exists(_.contains("leak-probe-thread")), + "a thread whose name matches a allowlist pattern must be excused" + ) + finally + leaked.interrupt() + leaked.join(2000) + end try + assert(!leaked.isAlive, "probe thread should have stopped after interrupt+join") + assert( + !LeakCheck.leakedNonDaemonThreads(baseline, Chunk.empty).exists(_.contains("leak-probe-thread")), + "after the thread joined it must no longer be reported as leaked" + ) + } + +end LeakCheckTest diff --git a/kyo-test/runner/shared/src/main/scala/kyo/test/runner/TestRunner.scala b/kyo-test/runner/shared/src/main/scala/kyo/test/runner/TestRunner.scala index f58282308e..54c93420b5 100644 --- a/kyo-test/runner/shared/src/main/scala/kyo/test/runner/TestRunner.scala +++ b/kyo-test/runner/shared/src/main/scala/kyo/test/runner/TestRunner.scala @@ -203,7 +203,17 @@ object TestRunner: ) ) } - val sr = SuiteReport(suiteInfo.name, leaf ++ synthetic, duration) + val sr = SuiteReport( + suiteInfo.name, + leaf ++ synthetic, + duration, + leakCheck = effectiveConfig.leakCheck, + leakCheckSockets = effectiveConfig.leakCheckSockets, + leakCheckFileDescriptors = effectiveConfig.leakCheckFileDescriptors, + leakCheckThreads = effectiveConfig.leakCheckThreads, + leakCheckFibers = effectiveConfig.leakCheckFibers, + leakCheckAllowlist = effectiveConfig.leakCheckAllowlist + ) reporter.onSuiteComplete(suiteInfo, sr) val report = TestReport(Chunk(sr)) reporter.onRunComplete(report) @@ -215,10 +225,10 @@ object TestRunner: // failed Kyo value. A Panic is logged (never swallowed) and recorded as a failure. Abort.run[Throwable](pipeline).map { case Result.Success(report) => report - case Result.Failure(t) => constructorFailureReport(suiteInfo, reporter, t) + case Result.Failure(t) => constructorFailureReport(suiteInfo, reporter, effectiveConfig, t) case panic: Result.Panic => java.lang.System.err.println(s"[kyo-test] unexpected panic during run: ${panic.exception}") - constructorFailureReport(suiteInfo, reporter, panic.exception) + constructorFailureReport(suiteInfo, reporter, effectiveConfig, panic.exception) } end runReport @@ -522,11 +532,17 @@ object TestRunner: // ── Constructor failure ───────────────────────────────────────────────────────────────────── - private def constructorFailureReport(suiteInfo: SuiteInfo, reporter: TestReporter, t: Throwable): TestReport = + private def constructorFailureReport(suiteInfo: SuiteInfo, reporter: TestReporter, config: RunConfig, t: Throwable): TestReport = val sr = SuiteReport( suiteInfo.name, Chunk((Chunk(""), TestResult.Failed(t.toString, Maybe(t), Duration.Zero))), - Duration.Zero + Duration.Zero, + leakCheck = config.leakCheck, + leakCheckSockets = config.leakCheckSockets, + leakCheckFileDescriptors = config.leakCheckFileDescriptors, + leakCheckThreads = config.leakCheckThreads, + leakCheckFibers = config.leakCheckFibers, + leakCheckAllowlist = config.leakCheckAllowlist ) reporter.onSuiteComplete(suiteInfo, sr) val report = TestReport(Chunk(sr)) diff --git a/kyo-test/runner/shared/src/test/scala/kyo/test/runner/RunnerTest.scala b/kyo-test/runner/shared/src/test/scala/kyo/test/runner/RunnerTest.scala index 9c15c19333..ebd5280847 100644 --- a/kyo-test/runner/shared/src/test/scala/kyo/test/runner/RunnerTest.scala +++ b/kyo-test/runner/shared/src/test/scala/kyo/test/runner/RunnerTest.scala @@ -108,6 +108,13 @@ class RTAbortAnySuite extends TestBase[Any]: "ok" in assert(1 == 1) end RTAbortAnySuite +/** Its constructor throws before any leaf registers, exercising TestRunner.constructorFailureReport: the recovered + * SuiteReport must carry the run's effective leak config, not the SuiteReport field defaults. + */ +class RTConstructorFailSuite extends TestBase[Any]: + throw new RuntimeException("constructor boom") +end RTConstructorFailSuite + // ── AssertScope leak-capture fixtures (runner-side capture) ───────────────────────────────────── /** A detached fiber asserts false DURING the leaf body while the body's main computation would otherwise pass. @@ -368,8 +375,7 @@ class RunnerTest extends AsyncFreeSpec with NonImplicitAssertions: // SuiteReport and appends one synthetic failed leaf per leak. This collector is process-global, so clear it at the // start so a prior test cannot pollute it, and leave it empty at the end. val _ = kyo.test.AssertScope.drainLeakedAfterClose() - val frame = summon[Frame] - val origin = new kyo.test.AssertionFailed("ORIGINAL-DIAGRAM-TEXT", frame, Maybe.empty[String], Maybe.empty[Throwable]) + val origin = new kyo.test.AssertionFailed("ORIGINAL-DIAGRAM-TEXT", Frame.internal, Maybe.empty[String], Maybe.empty[Throwable]) kyo.test.AssertScope.leakedAfterClose.add((Chunk("some", "leaf"), origin)): Unit TestRunner.runToFuture(classOf[RTNoLeakSuite], RunConfig.default).map { report => // The real passing leaf survives unchanged, plus exactly one synthetic leaf for the enqueued leak. @@ -440,4 +446,20 @@ class RunnerTest extends AsyncFreeSpec with NonImplicitAssertions: } } + "Constructor failure: the recovered SuiteReport carries the run's effective leak config, not the field defaults" in { + // A suite whose constructor throws is recovered into a -failure SuiteReport. That report must + // reflect the run's effective leak settings, threaded through constructorFailureReport, rather than the + // SuiteReport field defaults. Both arms are asserted: the default (sockets on) carries through, and an + // explicit leakCheckSockets(false) is preserved (proving the field is threaded, not forced to the default). + for + defaultReport <- TestRunner.runToFuture(classOf[RTConstructorFailSuite], RunConfig.default) + socketsOffReport <- TestRunner.runToFuture(classOf[RTConstructorFailSuite], RunConfig.default.leakCheckSockets(false)) + yield + val defaultSr = defaultReport.suiteReports.head + val socketsOffSr = socketsOffReport.suiteReports.head + assert(defaultSr.leafResults.exists(_._1 == Chunk("")), "expected a failure leaf") + assert(defaultSr.leakCheckSockets, "default config: socket-leak detection is on and must carry through a constructor failure") + assert(!socketsOffSr.leakCheckSockets, "leakCheckSockets(false) must be carried into the constructor-failure report") + } + end RunnerTest diff --git a/kyo-ui/shared/src/test/scala/kyo/UITest.scala b/kyo-ui/shared/src/test/scala/kyo/UITest.scala index 454f1dd3ad..c0fe664a60 100644 --- a/kyo-ui/shared/src/test/scala/kyo/UITest.scala +++ b/kyo-ui/shared/src/test/scala/kyo/UITest.scala @@ -11,7 +11,13 @@ abstract class UITest extends kyo.test.Test[Any]: // // failOnNoAssertion is disabled because kyo-ui suites assert through Browser.assert* (domain helpers that do not // flow through the kyo.test assert macros), so the no-assertion counter sees zero. - override def config = super.config.sequential.failOnNoAssertion(false) + // + // The shared Chrome (Browser.runShared) is held for the whole run, so its CDP `socket:[inode]` and stdio + // `pipe:[inode]` are opaque-inode descriptors no allowlist can match. Disable only those two descriptor categories, + // keeping thread and fiber detection on (the kyo-http NioIoDriver fiber is built-in allowlisted). Same rationale as + // kyo-browser's BaseBrowserTest. + override def config = + super.config.sequential.failOnNoAssertion(false).leakCheckSockets(false).leakCheckFileDescriptors(false) /** Retry budget for transient Chrome-infrastructure failures: 2 retries (3 attempts total) with exponential backoff. Per-test fresh * Chrome occasionally drops its CDP connection or fails to launch under sustained full-suite load; a fresh attempt rides that out. diff --git a/kyo-website/jvm/src/test/scala/kyo/website/WebsiteBuildGraphTest.scala b/kyo-website/jvm/src/test/scala/kyo/website/WebsiteBuildGraphTest.scala index aa0e9dde09..55c8afd1c6 100644 --- a/kyo-website/jvm/src/test/scala/kyo/website/WebsiteBuildGraphTest.scala +++ b/kyo-website/jvm/src/test/scala/kyo/website/WebsiteBuildGraphTest.scala @@ -1,41 +1,46 @@ package kyo.website -import java.nio.file.Files -import java.nio.file.Paths +import kyo.* +import kyo.AllowUnsafe.embrace.danger class WebsiteBuildGraphTest extends WebsiteTest: // Locate the repo root by walking up from user.dir until we find build.sbt. - private def repoRoot(): java.nio.file.Path = - val start = Paths.get(java.lang.System.getProperty("user.dir")).toAbsolutePath - Iterator - .iterate(start)(_.getParent) - .takeWhile(_ != null) - .find(dir => Files.exists(dir.resolve("build.sbt"))) - .getOrElse(throw new RuntimeException("repo root with build.sbt not found")) + private def repoRoot(): Path = + @scala.annotation.tailrec + def loop(dir: Path): Path = + if (dir / "build.sbt").unsafe.exists() then dir + else + dir.parent match + case Maybe.Present(parent) => loop(parent) + case Maybe.Absent => throw new RuntimeException("repo root with build.sbt not found") + loop(Path(java.lang.System.getProperty("user.dir").nn)) end repoRoot // build.sbt is at the repo root. private def buildSbtLines(): List[String] = - val p = repoRoot().resolve("build.sbt") - new String(Files.readAllBytes(p)).linesIterator.toList + (repoRoot() / "build.sbt").unsafe.read().getOrThrow.linesIterator.toList // kyo-website/shared, kyo-website/js, and kyo-website-bundle source trees should have no flexmark import. private def sourceLines(subdir: String): List[String] = - val root = repoRoot().resolve(subdir) - if Files.exists(root) then - import scala.jdk.CollectionConverters.* - Files.walk(root) - .iterator() - .asScala + val root = repoRoot() / subdir + if root.unsafe.exists() then + filesUnder(root) .filter(p => p.toString.endsWith(".scala") || p.toString.endsWith(".sbt")) - .flatMap(p => new String(Files.readAllBytes(p)).linesIterator) - .toList + .flatMap(p => p.unsafe.read().getOrThrow.linesIterator.toList) else Nil end if end sourceLines + // Path.Unsafe.list collects each directory's entries and closes the stream before returning (no leaked fd); + // recurse to cover the full tree without following symlinks. + private def filesUnder(dir: Path): List[Path] = + dir.unsafe.list().getOrThrow.toList.flatMap { entry => + if entry.unsafe.isDirectory() && !entry.unsafe.isSymbolicLink() then filesUnder(entry) + else List(entry) + } + "flexmark JVM-only import grep" - { "zero flexmark matches in kyo-website/shared" in { val lines = sourceLines("kyo-website/shared") @@ -156,14 +161,12 @@ class WebsiteBuildGraphTest extends WebsiteTest: // any scala_meta symbol references and assert zero occurrences. The file is only present // after `sbt kyo-website-bundleJS/fullLinkJS`; when absent the test is cancelled (not failed). "zero scala_meta in bundle-opt main.js (skip when absent)" in { - val ver = scala.util.Properties.versionNumberString - val jsPath = repoRoot().resolve( - s"kyo-website-bundle/js/target/scala-$ver/kyo-website-bundle-opt/main.js" - ) - if !Files.exists(jsPath) then + val ver = scala.util.Properties.versionNumberString + val jsPath = repoRoot() / s"kyo-website-bundle/js/target/scala-$ver/kyo-website-bundle-opt/main.js" + if !jsPath.unsafe.exists() then cancel("Bundle-opt check skipped: bundle-opt main.js not present (run fullLinkJS first)") else - val text = new String(Files.readAllBytes(jsPath)) + val text = jsPath.unsafe.read().getOrThrow val scalaMetaCount = text.sliding("scala_meta".length).count(_ == "scala_meta") val docsMarkdownCount = text.sliding("DocsMarkdown".length).count(_ == "DocsMarkdown") assert( diff --git a/kyo-website/jvm/src/test/scala/kyo/website/WebsiteGeneratorTest.scala b/kyo-website/jvm/src/test/scala/kyo/website/WebsiteGeneratorTest.scala index 3a931415bf..c489e4d555 100644 --- a/kyo-website/jvm/src/test/scala/kyo/website/WebsiteGeneratorTest.scala +++ b/kyo-website/jvm/src/test/scala/kyo/website/WebsiteGeneratorTest.scala @@ -1,7 +1,5 @@ package kyo.website -import java.nio.file.Files -import java.nio.file.Paths import kyo.* /** Tests for `WebsiteGenerator.emit`. @@ -45,28 +43,27 @@ class WebsiteGeneratorTest extends WebsiteTest: // ---- Helpers ---- - private def tmpDir(using Frame): Path < Sync = - Sync.defer(Path(Files.createTempDirectory("kyo-gen-test").toString)) + private def tmpDir(using Frame): Path < (Sync & Abort[FileFsException]) = + Path.tempDir("kyo-gen-test") - private def stubBundleDir(using Frame): Path < Sync = - Sync.defer { - val d = Files.createTempDirectory("kyo-bundle-stub") - java.nio.file.Files.writeString(d.resolve("main.js"), "// stub") - java.nio.file.Files.writeString(d.resolve("main.js.map"), "{}") - Path(d.toString) + private def stubBundleDir(using Frame): Path < (Sync & Abort[FileFsException | FileWriteException]) = + Path.tempDir("kyo-bundle-stub").map { d => + (d / "main.js").write("// stub") + .andThen((d / "main.js.map").write("{}")) + .andThen(d) } private def repoRoot(using Frame): Path = - Path(findRepoRoot().toString) - - private def findRepoRoot(): java.nio.file.Path = - val start = Paths.get(java.lang.System.getProperty("user.dir")).toAbsolutePath - Iterator - .iterate(start)(_.getParent) - .takeWhile(_ != null) - .find(dir => Files.exists(dir.resolve("build.sbt"))) - .getOrElse(throw new RuntimeException("repo root not found")) - end findRepoRoot + import AllowUnsafe.embrace.danger + @scala.annotation.tailrec + def loop(dir: Path): Path = + if (dir / "build.sbt").unsafe.exists() then dir + else + dir.parent match + case Maybe.Present(parent) => loop(parent) + case Maybe.Absent => throw new RuntimeException("repo root not found") + loop(Path(java.lang.System.getProperty("user.dir").nn)) + end repoRoot private def emit( content: Chunk[WebsiteContent], @@ -218,11 +215,9 @@ class WebsiteGeneratorTest extends WebsiteTest: "write failure aborts with WebsiteEmitException" in { for bundleDir <- stubBundleDir - tmp <- Sync.defer { - val d = Files.createTempDirectory("kyo-gen-fail-test") + tmp <- Path.tempDir("kyo-gen-fail-test").map { d => // Create a directory at index.html so writing a file there will fail - Files.createDirectory(Paths.get(d.toString, "index.html")) - Path(d.toString) + (d / "index.html").mkDir.andThen(d) } result <- Abort.run[WebsiteException](emit(oneVersion, tmp, bundleDir)) yield @@ -248,8 +243,8 @@ class WebsiteGeneratorTest extends WebsiteTest: "idempotent re-emit produces byte-identical files" in { for bundleDir <- stubBundleDir - out1 <- Sync.defer(Path(Files.createTempDirectory("kyo-gen-idem-a").toString)) - out2 <- Sync.defer(Path(Files.createTempDirectory("kyo-gen-idem-b").toString)) + out1 <- Path.tempDir("kyo-gen-idem-a") + out2 <- Path.tempDir("kyo-gen-idem-b") _ <- emit(oneVersion, out1, bundleDir) _ <- emit(oneVersion, out2, bundleDir) html1 <- readFile(out1 / "index.html") @@ -844,12 +839,15 @@ class WebsiteGeneratorTest extends WebsiteTest: sitemap <- readFile(out / "sitemap.xml") // Cross-check the count against the actual emitted latest// pages + 2 (the root // and the /latest/ overview). - latestSlugPages <- Sync.defer { - val latestDir = java.nio.file.Paths.get(out.toString, "latest") - if Files.exists(latestDir) then - Files.list(latestDir).filter(Files.isDirectory(_)).count().toInt - else 0 - } + latestSlugPages <- + val latestDir = out / "latest" + latestDir.exists.map { + case false => 0 + case true => + latestDir.list.map { entries => + Kyo.foreach(entries)(_.isDirectory).map(_.count(identity)) + } + } yield assert(sitemap.startsWith(": $sitemap") @@ -1584,7 +1582,7 @@ class WebsiteGeneratorTest extends WebsiteTest: for out <- tmpDir bundleDir <- stubBundleDir - noManifesto <- Sync.defer(Path(Files.createTempDirectory("kyo-no-manifesto").toString)) + noManifesto <- Path.tempDir("kyo-no-manifesto") result <- Abort.run[WebsiteException]( WebsiteGenerator.emit(Chunk(vWithModules), out, WebsiteGenerator.Config(noManifesto, bundleDir)) )