diff --git a/kyo-compat/README.md b/kyo-compat/README.md index 8c9626acf..93d30bb68 100644 --- a/kyo-compat/README.md +++ b/kyo-compat/README.md @@ -376,11 +376,11 @@ Available operations: `CMeter.init(permits)`, `meter.run(c): CIO[A]`, `meter.try ## Streams -`CStream[+A]` is a portable stream type alongside `CIO`. A library targeting the streams surface compiles unchanged against every binding; each backend wraps a native stream type where one exists, or supplies a hand-rolled implementation where the ecosystem lacks one. +`CStream[A]` is a portable stream type alongside `CIO`. A library targeting the streams surface compiles unchanged against every binding; each backend wraps a native stream type where one exists, or supplies a hand-rolled implementation where the ecosystem lacks one. It is invariant in `A` on every binding, so portable code never relies on a covariance one binding lacks: the Kyo binding wraps a richer computation that can carry multiple effect channels and so must be invariant (see below). -| Backend | `CStream[+A]` resolves to | +| Backend | `CStream[A]` resolves to | |----------------|------------------------------------------------------| -| Kyo | `kyo.Stream[A, Abort[Throwable] & Async]` | +| Kyo | holder over `kyo.Stream[A, Abort[Throwable] & Async]` | | ZIO | `zio.stream.ZStream[Any, Throwable, A]` | | Cats Effect | `fs2.Stream[cats.effect.IO, A]` | | Ox | `ox.Ox ?=> ox.flow.Flow[A]` | @@ -391,7 +391,11 @@ Platform footprints match the existing CIO surface: Kyo and ZIO ship JVM / JS / The kyo-named API tracks `kyo.Stream`: constructors `empty`, `init(seq)`, `init(c: CIO[Seq[A]])`, `range`, `unfold`; transforms `concat`, `mapPure` / `map`, `flatMap`, `tap`, `take`, `drop`, `takeWhilePure`, `filterPure` / `filter`, `collectPure`; and terminals `run: CIO[CChunk[A]]`, `foldPure`, `foreach`, `discard`. The pure/effectful split (`mapPure` vs. `map`, `filterPure` vs. `filter`, `foldPure`, `collectPure`, `takeWhilePure`) tracks the kyo convention; effectful variants take `A => CIO[B]`. -On the four bindings that wrap a third-party stream library (Kyo, ZIO, Cats Effect, Ox), every method is an `inline def` that compiles to a single native call, with at most a trivial type adapter (`Option ⇆ Maybe`, `n.toLong` for fs2/ZIO long-arity takes/drops, `Function.unlift` for partial-function collects, `Stream.eval(c.lower).flatMap(Stream.emits)` for fs2 `init`). The Twitter binding's `unfold` is the only exception on those four — `AsyncStream` ships no native unfold, so the wrap is a small recursive helper built on `AsyncStream.mk(head, => tail)`. The Future binding is the only fully hand-rolled implementation: `scala.concurrent.Future` has no canonical async stream, so the binding supplies a cons-stream where `Repr[A]` is a binding-private ADT (`Empty | Cons(head, tail: LocalCtx => Future[Repr[A]])`) matching the `CIO` carrier shape. Transformations build cons cells with lazy tails; terminal walks use a nested `@tailrec def loop` so 100000-element sync-completed streams don't blow the stack. +On the three bindings that wrap a third-party stream library with no extra plumbing (ZIO, Cats Effect, Ox), every method is an `inline def` that compiles to a single native call, with at most a trivial type adapter (`Option ⇆ Maybe`, `n.toLong` for fs2/ZIO long-arity takes/drops, `Function.unlift` for partial-function collects, `Stream.eval(c.lower).flatMap(Stream.emits)` for fs2 `init`). The Twitter binding's `unfold` is the only exception on those: `AsyncStream` ships no native unfold, so the wrap is a small recursive helper built on `AsyncStream.mk(head, => tail)`. + +The Future binding is the only fully hand-rolled implementation: `scala.concurrent.Future` has no canonical async stream, so the binding supplies a cons-stream where `Repr[A]` is a binding-private ADT (`Empty | Cons(head, tail: LocalCtx => Future[Repr[A]])`) matching the `CIO` carrier shape. Transformations build cons cells with lazy tails; terminal walks use a nested `@tailrec def loop` so 100000-element sync-completed streams don't blow the stack. + +The Kyo binding is a `final class` rather than a thin wrapper, because `kyo.Stream` re-summons a `Tag[Emit[Chunk[A]]]` on every operator: a plain delegation would leak that `Tag` onto callers and break generic pipelines. `CStream` captures the `Tag` once at construction (where `A` is concrete) and threads it through, so element-preserving operators stay tag-free and only element-changing ones (`map`/`flatMap`/...) ask for the output `Tag`. It is invariant because a lowered `kyo.Stream` is a richer computation that can carry several independent effect channels keyed by element type, and invariance keeps the stored tag matched to that carrier. ```scala import kyo.compat.* @@ -404,7 +408,7 @@ This compiles and runs against every binding × supported platform. Constructors and terminals not in the surface compose from what is: -- Failure stream: `CStream.init(CIO.fail(e))` — `init(c: CIO[Seq[A]])` propagates `c`'s failure. +- Failure stream: `CStream.init(CIO.fail(e))`, `init(c: CIO[Seq[A]])` propagates `c`'s failure. - Count: `s.foldPure(0L)((c, _) => c + 1L)`. ### Known divergences (kyo binding) diff --git a/kyo-compat/bindings/ce/shared/src/main/scala/kyo/compat/CStream.scala b/kyo-compat/bindings/ce/shared/src/main/scala/kyo/compat/CStream.scala index a97dcfea2..43baab5e7 100644 --- a/kyo-compat/bindings/ce/shared/src/main/scala/kyo/compat/CStream.scala +++ b/kyo-compat/bindings/ce/shared/src/main/scala/kyo/compat/CStream.scala @@ -8,7 +8,7 @@ import fs2.Stream * the carrier is already an `fs2.Stream`. Method names mirror `kyo.Stream` * (`mapPure`/`filterPure`/`takeWhilePure`/`collectPure`/`foldPure`/`discard`); the pure/effectful split tracks the kyo convention. */ -opaque type CStream[+A] = Stream[IO, A] +opaque type CStream[A] = Stream[IO, A] object CStream: diff --git a/kyo-compat/bindings/future/shared/src/main/scala/kyo/compat/CStream.scala b/kyo-compat/bindings/future/shared/src/main/scala/kyo/compat/CStream.scala index c778c9784..8d903564a 100644 --- a/kyo-compat/bindings/future/shared/src/main/scala/kyo/compat/CStream.scala +++ b/kyo-compat/bindings/future/shared/src/main/scala/kyo/compat/CStream.scala @@ -16,7 +16,7 @@ import scala.util.Success * sync-completed streams don't blow the stack. `lift` and `lower` are identity on the opaque `CStream[A]` since the binding's "native" * stream type is the carrier itself. */ -opaque type CStream[+A] = LocalCtx => Future[CStream.Repr[A]] +opaque type CStream[A] = LocalCtx => Future[CStream.Repr[A]] object CStream: diff --git a/kyo-compat/bindings/kyo/shared/src/main/scala/kyo/compat/CStream.scala b/kyo-compat/bindings/kyo/shared/src/main/scala/kyo/compat/CStream.scala index 1ec6fcd8e..4610bc9de 100644 --- a/kyo-compat/bindings/kyo/shared/src/main/scala/kyo/compat/CStream.scala +++ b/kyo-compat/bindings/kyo/shared/src/main/scala/kyo/compat/CStream.scala @@ -2,102 +2,140 @@ package kyo.compat import kyo.* -/** Underlying carrier is `kyo.Stream[A, Abort[Throwable] & Async]`. Operations preserve kyo `Frame`, `Tag`, and chunking semantics by - * delegating to the native `kyo.Stream` methods directly. `lift` and `lower` are identity since the carrier is already a kyo-native - * stream. Method names mirror `kyo.Stream` (`mapPure`/`filterPure`/`takeWhilePure`/`collectPure`/`foldPure`/`discard`); the pure/effectful - * split tracks the kyo convention. +/** Underlying carrier is `kyo.Stream[A, Abort[Throwable] & Async]`, paired with the `Tag[Emit[Chunk[A]]]` that keys its emit channel. + * + * `kyo.Stream` re-summons `Tag[Emit[Chunk[A]]]` on every operator, so generic code over an abstract `A` cannot call it. `CStream` captures + * that tag once, at construction (where `A` is concrete), and threads the stored tag into each delegated `kyo.Stream` call. The result is a + * tag-free surface for every element-preserving operator (`take`/`drop`/`filter`/`run`/`fold`/...), matching the other kyo-compat bindings + * so library authors can generalize a pipeline across libraries. + * + * Element-changing operators (`map`/`mapPure`/`flatMap`/`collectPure` → `CStream[B]`) require the output `Tag[Emit[Chunk[B]]]`: the stored + * `A`-tag cannot name `B`, and fabricating one would be unsound. `B` is concrete at every realistic call site, so the tag derives + * automatically there. + * + * `CStream` is invariant in `A`. Covariance would let `CStream[Dog]` flow where `CStream[Animal]` is expected while the stored tag still + * keys the `Dog` channel; the type-level widening would not be backed by the runtime key, silently dropping elements. Invariance keeps the + * stored tag honest, so `lower` returns a faithful native stream with no re-tagging and no casts. + * + * Method names mirror `kyo.Stream` (`mapPure`/`filterPure`/`takeWhilePure`/`collectPure`/`foldPure`/`discard`); the pure/effectful split + * tracks the kyo convention. */ -opaque type CStream[+A] = kyo.Stream[A, kyo.Abort[Throwable] & kyo.Async] +final class CStream[A] private[compat] ( + private[compat] val tag: Tag[Emit[Chunk[A]]], + private[compat] val stream: kyo.Stream[A, Abort[Throwable] & Async] +): + + /** Unwraps to the native `kyo.Stream`. The stored tag is always the honest key for the carrier, so this needs no re-tagging. */ + def lower: kyo.Stream[A, Abort[Throwable] & Async] = stream + + /** Concatenates `other` after `self`. */ + def concat[A2 >: A](other: CStream[A2])(using frame: Frame): CStream[A2] = + new CStream(other.tag, stream.concat(other.stream)) + + /** Maps each element with a pure function. */ + def mapPure[B](f: A => B)(using outTag: Tag[Emit[Chunk[B]]], frame: Frame): CStream[B] = + given tagA: Tag[Emit[Chunk[A]]] = tag + given tagB: Tag[Emit[Chunk[B]]] = outTag + new CStream(outTag, stream.mapPure(f)) + end mapPure + + /** Maps each element with an effectful function. */ + def map[B](f: A => CIO[B])(using outTag: Tag[Emit[Chunk[B]]], frame: Frame): CStream[B] = + given tagA: Tag[Emit[Chunk[A]]] = tag + given tagB: Tag[Emit[Chunk[B]]] = outTag + new CStream(outTag, stream.map(a => f(a).lower)) + end map + + /** Flat-maps each element to another stream and concatenates the results. */ + def flatMap[B](f: A => CStream[B])(using outTag: Tag[Emit[Chunk[B]]], frame: Frame): CStream[B] = + given tagA: Tag[Emit[Chunk[A]]] = tag + given tagB: Tag[Emit[Chunk[B]]] = outTag + new CStream(outTag, stream.flatMap(a => f(a).lower)) + end flatMap + + /** Runs `f` for its effect on each element, passing the element through unchanged. */ + def tap(f: A => CIO[Any])(using frame: Frame): CStream[A] = + given Tag[Emit[Chunk[A]]] = tag + new CStream(tag, stream.tap(a => f(a).lower)) + + /** Takes the first `n` elements. */ + def take(n: Int)(using frame: Frame): CStream[A] = + given Tag[Emit[Chunk[A]]] = tag + new CStream(tag, stream.take(n)) + + /** Drops the first `n` elements. */ + def drop(n: Int)(using frame: Frame): CStream[A] = + given Tag[Emit[Chunk[A]]] = tag + new CStream(tag, stream.drop(n)) + + /** Takes elements while `p` holds. */ + def takeWhilePure(p: A => Boolean)(using frame: Frame): CStream[A] = + given Tag[Emit[Chunk[A]]] = tag + new CStream(tag, stream.takeWhilePure(p)) + + /** Keeps elements matching the pure predicate. */ + def filterPure(p: A => Boolean)(using frame: Frame): CStream[A] = + given Tag[Emit[Chunk[A]]] = tag + new CStream(tag, stream.filterPure(p)) + + /** Keeps elements matching the effectful predicate. */ + def filter(p: A => CIO[Boolean])(using frame: Frame): CStream[A] = + given Tag[Emit[Chunk[A]]] = tag + new CStream(tag, stream.filter(a => p(a).lower)) + + /** Maps and filters in one pass via a pure partial mapping. */ + def collectPure[B](f: A => Option[B])(using outTag: Tag[Emit[Chunk[B]]], frame: Frame): CStream[B] = + given tagA: Tag[Emit[Chunk[A]]] = tag + given tagB: Tag[Emit[Chunk[B]]] = outTag + new CStream(outTag, stream.collectPure(a => Maybe.fromOption(f(a)))) + end collectPure + + /** Runs the stream and collects all emitted elements into a `CChunk`. */ + def run(using frame: Frame): CIO[CChunk[A]] = + given Tag[Emit[Chunk[A]]] = tag + CIO.lift(stream.run.map(CChunk.lift(_))) + + /** Folds the stream with a pure accumulator. */ + def foldPure[B](acc: B)(f: (B, A) => B)(using frame: Frame): CIO[B] = + given Tag[Emit[Chunk[A]]] = tag + CIO.lift(stream.foldPure(acc)(f)) + + /** Runs `f` for its effect on each element, discarding results. */ + def foreach(f: A => CIO[Unit])(using frame: Frame): CIO[Unit] = + given Tag[Emit[Chunk[A]]] = tag + CIO.lift(stream.foreach(a => f(a).lower)) + + /** Runs the stream and discards all emitted elements. */ + def discard(using frame: Frame): CIO[Unit] = + given Tag[Emit[Chunk[A]]] = tag + CIO.lift(stream.discard) + +end CStream object CStream: /** Empty stream. */ - inline def empty[A]: CStream[A] = kyo.Stream.empty[A] - - /** Stream emitting the elements produced by the effectful sequence. */ - inline def init[A](inline c: CIO[Seq[A]])(using inline frame: Frame): CStream[A] = - kyo.Stream.init(c.lower) + def empty[A](using tag: Tag[Emit[Chunk[A]]]): CStream[A] = + new CStream(tag, kyo.Stream.empty[A]) /** Stream emitting the elements of the given sequence. */ - inline def init[A](inline seq: Seq[A])(using inline frame: Frame): CStream[A] = - kyo.Stream.init(seq) + def init[A](seq: Seq[A])(using tag: Tag[Emit[Chunk[A]]], frame: Frame): CStream[A] = + new CStream(tag, kyo.Stream.init(seq)) + + /** Stream emitting the elements produced by the effectful sequence. */ + def init[A](c: CIO[Seq[A]])(using tag: Tag[Emit[Chunk[A]]], frame: Frame): CStream[A] = + new CStream(tag, kyo.Stream.init(c.lower)) /** Stream emitting `start` until `end` (exclusive), step 1. */ - inline def range(inline start: Int, inline end: Int)(using inline frame: Frame): CStream[Int] = - kyo.Stream.range(start, end) + def range(start: Int, end: Int)(using frame: Frame): CStream[Int] = + new CStream(Tag[Emit[Chunk[Int]]], kyo.Stream.range(start, end)) /** Stream produced by iteratively unfolding `acc` through `f`; emission stops when `f` yields `None`. */ - inline def unfold[S, A](inline acc: S)(inline f: S => CIO[Option[(A, S)]])(using inline frame: Frame): CStream[A] = - kyo.Stream.unfold(acc)(s => f(s).lower.map(kyo.Maybe.fromOption)) - - /** Wraps a native `kyo.Stream` as a `CStream`. Identity on the carrier. */ - inline def lift[A](inline native: kyo.Stream[A, kyo.Abort[Throwable] & kyo.Async]): CStream[A] = native - - extension [A](inline self: CStream[A]) - - /** Unwraps to the native `kyo.Stream`. Identity on the carrier. */ - inline def lower: kyo.Stream[A, kyo.Abort[Throwable] & kyo.Async] = self - - /** Concatenates `other` after `self`. */ - inline def concat[A2 >: A](inline other: CStream[A2])(using inline frame: Frame): CStream[A2] = - self.concat(other.lower) - - /** Maps each element with a pure function. */ - inline def mapPure[B](inline f: A => B)(using inline frame: Frame): CStream[B] = - self.mapPure(f) - - /** Maps each element with an effectful function. */ - inline def map[B](inline f: A => CIO[B])(using inline frame: Frame): CStream[B] = - self.map(a => f(a).lower) - - /** Flat-maps each element to another stream and concatenates the results. */ - inline def flatMap[B](inline f: A => CStream[B])(using inline frame: Frame): CStream[B] = - self.flatMap(a => f(a).lower) - - /** Runs `f` for its effect on each element, passing the element through unchanged. */ - inline def tap(inline f: A => CIO[Any])(using inline frame: Frame): CStream[A] = - self.tap(a => f(a).lower) - - /** Takes the first `n` elements. */ - inline def take(inline n: Int)(using inline frame: Frame): CStream[A] = - self.take(n) - - /** Drops the first `n` elements. */ - inline def drop(inline n: Int)(using inline frame: Frame): CStream[A] = - self.drop(n) - - /** Takes elements while `p` holds. */ - inline def takeWhilePure(inline p: A => Boolean)(using inline frame: Frame): CStream[A] = - self.takeWhilePure(p) - - /** Keeps elements matching the pure predicate. */ - inline def filterPure(inline p: A => Boolean)(using inline frame: Frame): CStream[A] = - self.filterPure(p) - - /** Keeps elements matching the effectful predicate. */ - inline def filter(inline p: A => CIO[Boolean])(using inline frame: Frame): CStream[A] = - self.filter(a => p(a).lower) - - /** Maps and filters in one pass via a pure partial mapping. */ - inline def collectPure[B](inline f: A => Option[B])(using inline frame: Frame): CStream[B] = - self.collectPure(a => kyo.Maybe.fromOption(f(a))) - - /** Runs the stream and collects all emitted elements into a `CChunk`. */ - inline def run(using inline frame: Frame): CIO[CChunk[A]] = - CIO.lift(self.run.map(CChunk.lift(_))) - - /** Folds the stream with a pure accumulator. */ - inline def foldPure[B](inline acc: B)(inline f: (B, A) => B)(using inline frame: Frame): CIO[B] = - CIO.lift(self.foldPure(acc)(f)) - - /** Runs `f` for its effect on each element, discarding results. */ - inline def foreach(inline f: A => CIO[Unit])(using inline frame: Frame): CIO[Unit] = - CIO.lift(self.foreach(a => f(a).lower)) - - /** Runs the stream and discards all emitted elements. */ - inline def discard(using inline frame: Frame): CIO[Unit] = - CIO.lift(self.discard) + def unfold[S, A](acc: S)(f: S => CIO[Option[(A, S)]])(using tag: Tag[Emit[Chunk[A]]], frame: Frame): CStream[A] = + new CStream(tag, kyo.Stream.unfold(acc)(s => f(s).lower.map(Maybe.fromOption))) - end extension + /** Wraps a native `kyo.Stream` as a `CStream`, capturing its emit-channel tag. */ + def lift[A](native: kyo.Stream[A, Abort[Throwable] & Async])(using tag: Tag[Emit[Chunk[A]]]): CStream[A] = + new CStream(tag, native) end CStream diff --git a/kyo-compat/bindings/kyo/shared/src/test/scala/kyo/compat/CStreamTagFreeTest.scala b/kyo-compat/bindings/kyo/shared/src/test/scala/kyo/compat/CStreamTagFreeTest.scala new file mode 100644 index 000000000..0ce3ae7dc --- /dev/null +++ b/kyo-compat/bindings/kyo/shared/src/test/scala/kyo/compat/CStreamTagFreeTest.scala @@ -0,0 +1,122 @@ +package kyo.compat + +import kyo.* +import org.scalactic.Prettifier + +/** Soundness and portability tests for the holder-based `CStream` (kyo binding only). + * + * The point of the holder design is that element-preserving operators carry no `Tag` requirement, so a library author can write a pipeline + * generic in the element type and have it compile against the kyo binding exactly as it does against the others. These tests pin that down, + * and pin down the channel-isolation soundness that the tag (not an erased `Any` key) provides. + */ +class CStreamTagFreeTest extends CompatTest: + + // CompatTest mixes in NonImplicitAssertions, which suppresses the ambient Prettifier that + // assertCompiles/assertDoesNotCompile require; reinstate it locally. + given Prettifier = Prettifier.default + + // --- Portability: generic-element pipelines compile with NO Tag bound --- + // Each helper below is generic in `A` and uses NO `using Tag[...]`. Before the holder design these would not + // compile (kyo.Stream re-summons Tag[Emit[Chunk[A]]] per operator); now they do. Their mere compilation is the + // regression guard; the assertions confirm they also behave correctly. + + def preserve[A](s: CStream[A]): CStream[A] = + s.take(5).drop(1).filterPure(_ => true) + + def collectAll[A](s: CStream[A]): CIO[CChunk[A]] = + s.run + + def count[A](s: CStream[A]): CIO[Int] = + s.foldPure(0)((n, _) => n + 1) + + def duplicate[A](s: CStream[A]): CStream[A] = + s.concat(s) + + "generic element-preserving pipeline runs without a Tag bound" in run { + preserve(CStream.init(Seq(1, 2, 3, 4, 5, 6, 7))).run.map(c => assert(c.toSeq == Seq(2, 3, 4, 5))) + } + + "generic element-preserving pipeline works for a non-Int element type" in run { + preserve(CStream.init(Seq("a", "b", "c", "d", "e", "f"))).run.map(c => assert(c.toSeq == Seq("b", "c", "d", "e"))) + } + + "generic terminal run works without a Tag bound" in run { + collectAll(CStream.init(Seq(10, 20, 30))).map(c => assert(c.toSeq == Seq(10, 20, 30))) + } + + "generic foldPure-based count works without a Tag bound" in run { + count(CStream.init(Seq("x", "y", "z", "w"))).map(n => assert(n == 4)) + } + + "generic concat works without a Tag bound" in run { + duplicate(CStream.init(Seq(1, 2))).run.map(c => assert(c.toSeq == Seq(1, 2, 1, 2))) + } + + // --- Soundness: distinct element channels never cross-talk --- + // If the holder keyed its channel with an erased `Tag[Emit[Chunk[Any]]]`, a flatMap from Int to String would let + // the handler greedily capture both channels. With the honest per-element tag, only the intended channel matches. + + "flatMap from Int to String emits only the String channel" in run { + CStream.init(Seq(1, 2)) + .flatMap(i => CStream.init(Seq(s"a$i", s"b$i"))) + .run.map(c => assert(c.toSeq == Seq("a1", "b1", "a2", "b2"))) + } + + "nested flatMap Int -> String -> Boolean keeps only the final channel" in run { + CStream.init(Seq(1, 2)) + .flatMap(i => CStream.init(Seq(s"v$i"))) + .flatMap(s => CStream.init(Seq(s.length % 2 == 0, s.length % 2 == 1))) + .run.map(c => assert(c.toSeq == Seq(true, false, true, false))) + } + + "map changing element type to String, then back to length, stays isolated" in run { + CStream.init(Seq(1, 22, 333)) + .map(i => CIO.value(i.toString)) + .mapPure(_.length) + .run.map(c => assert(c.toSeq == Seq(1, 2, 3))) + } + + // --- Soundness: lower returns a faithful native stream after an element change --- + // The stored tag is always honest, so lowering a mapped CStream yields a native kyo.Stream whose emit channel + // matches the honest output tag. Running it natively (with a freshly summoned tag) must see every element. + + "lower after an element-changing map yields a native stream that runs faithfully" in run { + val native: kyo.Stream[String, Abort[Throwable] & Async] = + CStream.init(Seq(1, 2, 3)).map(i => CIO.value(s"x$i")).lower + CIO.lift(native.run).map(chunk => assert(chunk.toSeq == Seq("x1", "x2", "x3"))) + } + + "lift(native).run round-trips a hand-built native stream" in run { + val native = kyo.Stream.init(Seq(7, 8, 9)) + CStream.lift(native).run.map(c => assert(c.toSeq == Seq(7, 8, 9))) + } + + // --- Characterization: the B1 boundary and invariance, asserted at the type level --- + + "element-preserving operators need no Tag, even fully generic" in { + assertCompiles("def f[A](s: CStream[A]): CStream[A] = s.take(1).drop(0).filterPure(_ => true)") + } + + "a fully-generic terminal needs no Tag" in { + assertCompiles("def f[A](s: CStream[A]): CIO[CChunk[A]] = s.run") + } + + "mapping to a fully-abstract element type without its Tag does NOT compile" in { + // B1: the stored A-tag cannot name B; the output Tag must be supplied (it derives automatically for concrete B). + assertDoesNotCompile("def f[A, B](s: CStream[A])(g: A => CIO[B]): CStream[B] = s.map(g)") + } + + "mapping to an abstract element type WITH its Tag compiles" in { + assertCompiles( + "def f[A, B](s: CStream[A])(g: A => CIO[B])(using kyo.Tag[kyo.Emit[kyo.Chunk[B]]]): CStream[B] = s.map(g)" + ) + } + + "CStream is invariant: a CStream[Int] does not widen to CStream[Any]" in { + // A single declaration that only typechecks under covariant subsumption: the parameter is already typed + // CStream[Int], so no A-inference can adapt it. Covariance would desync the static element type from the + // stored runtime key; invariance keeps the tag honest. + assertDoesNotCompile("def widen(s: CStream[Int]): CStream[Any] = s") + } + +end CStreamTagFreeTest diff --git a/kyo-compat/bindings/ox/jvm/src/main/scala/kyo/compat/CStream.scala b/kyo-compat/bindings/ox/jvm/src/main/scala/kyo/compat/CStream.scala index c37dbef01..49d5c183e 100644 --- a/kyo-compat/bindings/ox/jvm/src/main/scala/kyo/compat/CStream.scala +++ b/kyo-compat/bindings/ox/jvm/src/main/scala/kyo/compat/CStream.scala @@ -10,7 +10,7 @@ import ox.flow.Flow * argument. Operations delegate to the native `Flow` methods directly; method names mirror `kyo.Stream` * (`mapPure`/`filterPure`/`takeWhilePure`/`collectPure`/`foldPure`/`discard`); the pure/effectful split tracks the kyo convention. */ -opaque type CStream[+A] = Ox => Flow[A] +opaque type CStream[A] = Ox => Flow[A] object CStream: diff --git a/kyo-compat/bindings/twitter-future/jvm/src/main/scala/kyo/compat/CStream.scala b/kyo-compat/bindings/twitter-future/jvm/src/main/scala/kyo/compat/CStream.scala index d8426e472..daff936ae 100644 --- a/kyo-compat/bindings/twitter-future/jvm/src/main/scala/kyo/compat/CStream.scala +++ b/kyo-compat/bindings/twitter-future/jvm/src/main/scala/kyo/compat/CStream.scala @@ -10,7 +10,7 @@ import com.twitter.concurrent.AsyncStream * `foreach` / `discard` lower to `AsyncStream.toSeq()` / `foldLeft` / `foreachF` / `force` respectively, wrapped via `CIO.deferLift` so * each materialization re-runs the stream. */ -opaque type CStream[+A] = AsyncStream[A] +opaque type CStream[A] = AsyncStream[A] object CStream: diff --git a/kyo-compat/bindings/zio/shared/src/main/scala/kyo/compat/CStream.scala b/kyo-compat/bindings/zio/shared/src/main/scala/kyo/compat/CStream.scala index ff4d90207..1cb627015 100644 --- a/kyo-compat/bindings/zio/shared/src/main/scala/kyo/compat/CStream.scala +++ b/kyo-compat/bindings/zio/shared/src/main/scala/kyo/compat/CStream.scala @@ -8,7 +8,7 @@ import zio.stream.ZStream * already a `ZStream`. Method names mirror `kyo.Stream` (`mapPure`/`filterPure`/`takeWhilePure`/`collectPure`/`foldPure`/`discard`); the * pure/effectful split tracks the kyo convention. */ -opaque type CStream[+A] = ZStream[Any, Throwable, A] +opaque type CStream[A] = ZStream[Any, Throwable, A] object CStream: