Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions kyo-compat/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,11 @@ Available operations: `CMeter.init(permits)`, `meter.run(c): CIO[A]`, `meter.try

## Streams

`CStream[+A]` is a portable stream type alongside `CIO`. A library targeting the streams surface compiles unchanged against every binding; each backend wraps a native stream type where one exists, or supplies a hand-rolled implementation where the ecosystem lacks one.
`CStream[A]` is a portable stream type alongside `CIO`. A library targeting the streams surface compiles unchanged against every binding; each backend wraps a native stream type where one exists, or supplies a hand-rolled implementation where the ecosystem lacks one. It is invariant in `A` on every binding, so portable code never relies on a covariance one binding lacks: the Kyo binding wraps a richer computation that can carry multiple effect channels and so must be invariant (see below).

| Backend | `CStream[+A]` resolves to |
| Backend | `CStream[A]` resolves to |
|----------------|------------------------------------------------------|
| Kyo | `kyo.Stream[A, Abort[Throwable] & Async]` |
| Kyo | holder over `kyo.Stream[A, Abort[Throwable] & Async]` |
| ZIO | `zio.stream.ZStream[Any, Throwable, A]` |
| Cats Effect | `fs2.Stream[cats.effect.IO, A]` |
| Ox | `ox.Ox ?=> ox.flow.Flow[A]` |
Expand All @@ -391,7 +391,11 @@ Platform footprints match the existing CIO surface: Kyo and ZIO ship JVM / JS /

The kyo-named API tracks `kyo.Stream`: constructors `empty`, `init(seq)`, `init(c: CIO[Seq[A]])`, `range`, `unfold`; transforms `concat`, `mapPure` / `map`, `flatMap`, `tap`, `take`, `drop`, `takeWhilePure`, `filterPure` / `filter`, `collectPure`; and terminals `run: CIO[CChunk[A]]`, `foldPure`, `foreach`, `discard`. The pure/effectful split (`mapPure` vs. `map`, `filterPure` vs. `filter`, `foldPure`, `collectPure`, `takeWhilePure`) tracks the kyo convention; effectful variants take `A => CIO[B]`.

On the four bindings that wrap a third-party stream library (Kyo, ZIO, Cats Effect, Ox), every method is an `inline def` that compiles to a single native call, with at most a trivial type adapter (`Option ⇆ Maybe`, `n.toLong` for fs2/ZIO long-arity takes/drops, `Function.unlift` for partial-function collects, `Stream.eval(c.lower).flatMap(Stream.emits)` for fs2 `init`). The Twitter binding's `unfold` is the only exception on those four — `AsyncStream` ships no native unfold, so the wrap is a small recursive helper built on `AsyncStream.mk(head, => tail)`. The Future binding is the only fully hand-rolled implementation: `scala.concurrent.Future` has no canonical async stream, so the binding supplies a cons-stream where `Repr[A]` is a binding-private ADT (`Empty | Cons(head, tail: LocalCtx => Future[Repr[A]])`) matching the `CIO` carrier shape. Transformations build cons cells with lazy tails; terminal walks use a nested `@tailrec def loop` so 100000-element sync-completed streams don't blow the stack.
On the three bindings that wrap a third-party stream library with no extra plumbing (ZIO, Cats Effect, Ox), every method is an `inline def` that compiles to a single native call, with at most a trivial type adapter (`Option ⇆ Maybe`, `n.toLong` for fs2/ZIO long-arity takes/drops, `Function.unlift` for partial-function collects, `Stream.eval(c.lower).flatMap(Stream.emits)` for fs2 `init`). The Twitter binding's `unfold` is the only exception on those: `AsyncStream` ships no native unfold, so the wrap is a small recursive helper built on `AsyncStream.mk(head, => tail)`.

The Future binding is the only fully hand-rolled implementation: `scala.concurrent.Future` has no canonical async stream, so the binding supplies a cons-stream where `Repr[A]` is a binding-private ADT (`Empty | Cons(head, tail: LocalCtx => Future[Repr[A]])`) matching the `CIO` carrier shape. Transformations build cons cells with lazy tails; terminal walks use a nested `@tailrec def loop` so 100000-element sync-completed streams don't blow the stack.

The Kyo binding is a `final class` rather than a thin wrapper, because `kyo.Stream` re-summons a `Tag[Emit[Chunk[A]]]` on every operator: a plain delegation would leak that `Tag` onto callers and break generic pipelines. `CStream` captures the `Tag` once at construction (where `A` is concrete) and threads it through, so element-preserving operators stay tag-free and only element-changing ones (`map`/`flatMap`/...) ask for the output `Tag`. It is invariant because a lowered `kyo.Stream` is a richer computation that can carry several independent effect channels keyed by element type, and invariance keeps the stored tag matched to that carrier.

```scala
import kyo.compat.*
Expand All @@ -404,7 +408,7 @@ This compiles and runs against every binding × supported platform.

Constructors and terminals not in the surface compose from what is:

- Failure stream: `CStream.init(CIO.fail(e))` `init(c: CIO[Seq[A]])` propagates `c`'s failure.
- Failure stream: `CStream.init(CIO.fail(e))`, `init(c: CIO[Seq[A]])` propagates `c`'s failure.
- Count: `s.foldPure(0L)((c, _) => c + 1L)`.

### Known divergences (kyo binding)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import fs2.Stream
* the carrier is already an `fs2.Stream`. Method names mirror `kyo.Stream`
* (`mapPure`/`filterPure`/`takeWhilePure`/`collectPure`/`foldPure`/`discard`); the pure/effectful split tracks the kyo convention.
*/
opaque type CStream[+A] = Stream[IO, A]
opaque type CStream[A] = Stream[IO, A]

object CStream:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import scala.util.Success
* sync-completed streams don't blow the stack. `lift` and `lower` are identity on the opaque `CStream[A]` since the binding's "native"
* stream type is the carrier itself.
*/
opaque type CStream[+A] = LocalCtx => Future[CStream.Repr[A]]
opaque type CStream[A] = LocalCtx => Future[CStream.Repr[A]]

object CStream:

Expand Down
208 changes: 123 additions & 85 deletions kyo-compat/bindings/kyo/shared/src/main/scala/kyo/compat/CStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,102 +2,140 @@ package kyo.compat

import kyo.*

/** Underlying carrier is `kyo.Stream[A, Abort[Throwable] & Async]`. Operations preserve kyo `Frame`, `Tag`, and chunking semantics by
* delegating to the native `kyo.Stream` methods directly. `lift` and `lower` are identity since the carrier is already a kyo-native
* stream. Method names mirror `kyo.Stream` (`mapPure`/`filterPure`/`takeWhilePure`/`collectPure`/`foldPure`/`discard`); the pure/effectful
* split tracks the kyo convention.
/** Underlying carrier is `kyo.Stream[A, Abort[Throwable] & Async]`, paired with the `Tag[Emit[Chunk[A]]]` that keys its emit channel.
*
* `kyo.Stream` re-summons `Tag[Emit[Chunk[A]]]` on every operator, so generic code over an abstract `A` cannot call it. `CStream` captures
* that tag once, at construction (where `A` is concrete), and threads the stored tag into each delegated `kyo.Stream` call. The result is a
* tag-free surface for every element-preserving operator (`take`/`drop`/`filter`/`run`/`fold`/...), matching the other kyo-compat bindings
* so library authors can generalize a pipeline across libraries.
*
* Element-changing operators (`map`/`mapPure`/`flatMap`/`collectPure` → `CStream[B]`) require the output `Tag[Emit[Chunk[B]]]`: the stored
* `A`-tag cannot name `B`, and fabricating one would be unsound. `B` is concrete at every realistic call site, so the tag derives
* automatically there.
*
* `CStream` is invariant in `A`. Covariance would let `CStream[Dog]` flow where `CStream[Animal]` is expected while the stored tag still
* keys the `Dog` channel; the type-level widening would not be backed by the runtime key, silently dropping elements. Invariance keeps the
* stored tag honest, so `lower` returns a faithful native stream with no re-tagging and no casts.
*
* Method names mirror `kyo.Stream` (`mapPure`/`filterPure`/`takeWhilePure`/`collectPure`/`foldPure`/`discard`); the pure/effectful split
* tracks the kyo convention.
*/
opaque type CStream[+A] = kyo.Stream[A, kyo.Abort[Throwable] & kyo.Async]
final class CStream[A] private[compat] (
private[compat] val tag: Tag[Emit[Chunk[A]]],
private[compat] val stream: kyo.Stream[A, Abort[Throwable] & Async]
):

/** Unwraps to the native `kyo.Stream`. The stored tag is always the honest key for the carrier, so this needs no re-tagging. */
def lower: kyo.Stream[A, Abort[Throwable] & Async] = stream

/** Concatenates `other` after `self`. */
def concat[A2 >: A](other: CStream[A2])(using frame: Frame): CStream[A2] =
new CStream(other.tag, stream.concat(other.stream))

/** Maps each element with a pure function. */
def mapPure[B](f: A => B)(using outTag: Tag[Emit[Chunk[B]]], frame: Frame): CStream[B] =
given tagA: Tag[Emit[Chunk[A]]] = tag
given tagB: Tag[Emit[Chunk[B]]] = outTag
new CStream(outTag, stream.mapPure(f))
end mapPure

/** Maps each element with an effectful function. */
def map[B](f: A => CIO[B])(using outTag: Tag[Emit[Chunk[B]]], frame: Frame): CStream[B] =
given tagA: Tag[Emit[Chunk[A]]] = tag
given tagB: Tag[Emit[Chunk[B]]] = outTag
new CStream(outTag, stream.map(a => f(a).lower))
end map

/** Flat-maps each element to another stream and concatenates the results. */
def flatMap[B](f: A => CStream[B])(using outTag: Tag[Emit[Chunk[B]]], frame: Frame): CStream[B] =
given tagA: Tag[Emit[Chunk[A]]] = tag

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this tag given in the class?

given tagB: Tag[Emit[Chunk[B]]] = outTag
new CStream(outTag, stream.flatMap(a => f(a).lower))
end flatMap

/** Runs `f` for its effect on each element, passing the element through unchanged. */
def tap(f: A => CIO[Any])(using frame: Frame): CStream[A] =
given Tag[Emit[Chunk[A]]] = tag
new CStream(tag, stream.tap(a => f(a).lower))

/** Takes the first `n` elements. */
def take(n: Int)(using frame: Frame): CStream[A] =
given Tag[Emit[Chunk[A]]] = tag
new CStream(tag, stream.take(n))

/** Drops the first `n` elements. */
def drop(n: Int)(using frame: Frame): CStream[A] =
given Tag[Emit[Chunk[A]]] = tag
new CStream(tag, stream.drop(n))

/** Takes elements while `p` holds. */
def takeWhilePure(p: A => Boolean)(using frame: Frame): CStream[A] =
given Tag[Emit[Chunk[A]]] = tag
new CStream(tag, stream.takeWhilePure(p))

/** Keeps elements matching the pure predicate. */
def filterPure(p: A => Boolean)(using frame: Frame): CStream[A] =
given Tag[Emit[Chunk[A]]] = tag
new CStream(tag, stream.filterPure(p))

/** Keeps elements matching the effectful predicate. */
def filter(p: A => CIO[Boolean])(using frame: Frame): CStream[A] =
given Tag[Emit[Chunk[A]]] = tag
new CStream(tag, stream.filter(a => p(a).lower))

/** Maps and filters in one pass via a pure partial mapping. */
def collectPure[B](f: A => Option[B])(using outTag: Tag[Emit[Chunk[B]]], frame: Frame): CStream[B] =
given tagA: Tag[Emit[Chunk[A]]] = tag
given tagB: Tag[Emit[Chunk[B]]] = outTag
new CStream(outTag, stream.collectPure(a => Maybe.fromOption(f(a))))
end collectPure

/** Runs the stream and collects all emitted elements into a `CChunk`. */
def run(using frame: Frame): CIO[CChunk[A]] =
given Tag[Emit[Chunk[A]]] = tag
CIO.lift(stream.run.map(CChunk.lift(_)))

/** Folds the stream with a pure accumulator. */
def foldPure[B](acc: B)(f: (B, A) => B)(using frame: Frame): CIO[B] =
given Tag[Emit[Chunk[A]]] = tag
CIO.lift(stream.foldPure(acc)(f))

/** Runs `f` for its effect on each element, discarding results. */
def foreach(f: A => CIO[Unit])(using frame: Frame): CIO[Unit] =
given Tag[Emit[Chunk[A]]] = tag
CIO.lift(stream.foreach(a => f(a).lower))

/** Runs the stream and discards all emitted elements. */
def discard(using frame: Frame): CIO[Unit] =
given Tag[Emit[Chunk[A]]] = tag
CIO.lift(stream.discard)

end CStream

object CStream:

/** Empty stream. */
inline def empty[A]: CStream[A] = kyo.Stream.empty[A]

/** Stream emitting the elements produced by the effectful sequence. */
inline def init[A](inline c: CIO[Seq[A]])(using inline frame: Frame): CStream[A] =
kyo.Stream.init(c.lower)
def empty[A](using tag: Tag[Emit[Chunk[A]]]): CStream[A] =
new CStream(tag, kyo.Stream.empty[A])

/** Stream emitting the elements of the given sequence. */
inline def init[A](inline seq: Seq[A])(using inline frame: Frame): CStream[A] =
kyo.Stream.init(seq)
def init[A](seq: Seq[A])(using tag: Tag[Emit[Chunk[A]]], frame: Frame): CStream[A] =
new CStream(tag, kyo.Stream.init(seq))

/** Stream emitting the elements produced by the effectful sequence. */
def init[A](c: CIO[Seq[A]])(using tag: Tag[Emit[Chunk[A]]], frame: Frame): CStream[A] =
new CStream(tag, kyo.Stream.init(c.lower))

/** Stream emitting `start` until `end` (exclusive), step 1. */
inline def range(inline start: Int, inline end: Int)(using inline frame: Frame): CStream[Int] =
kyo.Stream.range(start, end)
def range(start: Int, end: Int)(using frame: Frame): CStream[Int] =
new CStream(Tag[Emit[Chunk[Int]]], kyo.Stream.range(start, end))

/** Stream produced by iteratively unfolding `acc` through `f`; emission stops when `f` yields `None`. */
inline def unfold[S, A](inline acc: S)(inline f: S => CIO[Option[(A, S)]])(using inline frame: Frame): CStream[A] =
kyo.Stream.unfold(acc)(s => f(s).lower.map(kyo.Maybe.fromOption))

/** Wraps a native `kyo.Stream` as a `CStream`. Identity on the carrier. */
inline def lift[A](inline native: kyo.Stream[A, kyo.Abort[Throwable] & kyo.Async]): CStream[A] = native

extension [A](inline self: CStream[A])

/** Unwraps to the native `kyo.Stream`. Identity on the carrier. */
inline def lower: kyo.Stream[A, kyo.Abort[Throwable] & kyo.Async] = self

/** Concatenates `other` after `self`. */
inline def concat[A2 >: A](inline other: CStream[A2])(using inline frame: Frame): CStream[A2] =
self.concat(other.lower)

/** Maps each element with a pure function. */
inline def mapPure[B](inline f: A => B)(using inline frame: Frame): CStream[B] =
self.mapPure(f)

/** Maps each element with an effectful function. */
inline def map[B](inline f: A => CIO[B])(using inline frame: Frame): CStream[B] =
self.map(a => f(a).lower)

/** Flat-maps each element to another stream and concatenates the results. */
inline def flatMap[B](inline f: A => CStream[B])(using inline frame: Frame): CStream[B] =
self.flatMap(a => f(a).lower)

/** Runs `f` for its effect on each element, passing the element through unchanged. */
inline def tap(inline f: A => CIO[Any])(using inline frame: Frame): CStream[A] =
self.tap(a => f(a).lower)

/** Takes the first `n` elements. */
inline def take(inline n: Int)(using inline frame: Frame): CStream[A] =
self.take(n)

/** Drops the first `n` elements. */
inline def drop(inline n: Int)(using inline frame: Frame): CStream[A] =
self.drop(n)

/** Takes elements while `p` holds. */
inline def takeWhilePure(inline p: A => Boolean)(using inline frame: Frame): CStream[A] =
self.takeWhilePure(p)

/** Keeps elements matching the pure predicate. */
inline def filterPure(inline p: A => Boolean)(using inline frame: Frame): CStream[A] =
self.filterPure(p)

/** Keeps elements matching the effectful predicate. */
inline def filter(inline p: A => CIO[Boolean])(using inline frame: Frame): CStream[A] =
self.filter(a => p(a).lower)

/** Maps and filters in one pass via a pure partial mapping. */
inline def collectPure[B](inline f: A => Option[B])(using inline frame: Frame): CStream[B] =
self.collectPure(a => kyo.Maybe.fromOption(f(a)))

/** Runs the stream and collects all emitted elements into a `CChunk`. */
inline def run(using inline frame: Frame): CIO[CChunk[A]] =
CIO.lift(self.run.map(CChunk.lift(_)))

/** Folds the stream with a pure accumulator. */
inline def foldPure[B](inline acc: B)(inline f: (B, A) => B)(using inline frame: Frame): CIO[B] =
CIO.lift(self.foldPure(acc)(f))

/** Runs `f` for its effect on each element, discarding results. */
inline def foreach(inline f: A => CIO[Unit])(using inline frame: Frame): CIO[Unit] =
CIO.lift(self.foreach(a => f(a).lower))

/** Runs the stream and discards all emitted elements. */
inline def discard(using inline frame: Frame): CIO[Unit] =
CIO.lift(self.discard)
def unfold[S, A](acc: S)(f: S => CIO[Option[(A, S)]])(using tag: Tag[Emit[Chunk[A]]], frame: Frame): CStream[A] =
new CStream(tag, kyo.Stream.unfold(acc)(s => f(s).lower.map(Maybe.fromOption)))

end extension
/** Wraps a native `kyo.Stream` as a `CStream`, capturing its emit-channel tag. */
def lift[A](native: kyo.Stream[A, Abort[Throwable] & Async])(using tag: Tag[Emit[Chunk[A]]]): CStream[A] =
new CStream(tag, native)

end CStream
Loading