[compat] kyo-compat CStream: tag-free, uniformly invariant portable surface#1650
Open
fwbrasil wants to merge 1 commit into
Open
[compat] kyo-compat CStream: tag-free, uniformly invariant portable surface#1650fwbrasil wants to merge 1 commit into
fwbrasil wants to merge 1 commit into
Conversation
…urface ### Problem The streams surface promises portability: code written against `CStream` compiles unchanged against every binding. The kyo binding silently broke that. `kyo.Stream` keys its emit channel by a `Tag[Emit[Chunk[A]]]` re-summoned on every operator, and our inline delegations forwarded that requirement to callers, so generic element-abstract pipelines compiled against ce/zio/future/ox/twitter but not against the binding wrapping Kyo's own stream. Separately, the bindings disagreed on variance (`CStream[+A]` everywhere, but kyo cannot be covariant; see below), which breaks portability the other way. ### Solution Replace the kyo opaque alias with a small invariant `final class CStream[A]` that captures the `Tag` once at construction (where `A` is concrete) and threads it into each delegated `kyo.Stream` call. Element-preserving operators (`take`/`drop`/`filter`/`run`/`fold`/`concat`/...) need no `Tag`; element-changing ones (`map`/`mapPure`/`flatMap`/`collectPure`) take the output `Tag[Emit[Chunk[B]]]`, which derives automatically for concrete `B`. Make all other bindings invariant too (`CStream[A]`) so the portable surface agrees on variance. ### Notes - The kyo carrier is richer than the others: a lowered `kyo.Stream` is a raw computation that can carry several independent effect channels keyed by element type. The stored tag must stay matched to that carrier, which is why the binding is invariant and `lower` needs no re-tagging and no casts. The other bindings' carriers are covariant, so dropping `+` is a pure annotation change; all 38 shared `CStreamTest` cases still pass on every binding. - A root fix in `kyo.Stream` (dropping the per-operator `Tag`) was rejected: it exposes that raw multi-channel emit computation, where the element-typed tag is load-bearing across `Emit`/`Poll`/`Pipe`/`Sink`. - `CStreamTagFreeTest` (kyo binding) covers it: previously-uncompilable generic pipelines, channel isolation across flatMap chains, faithful `lower` after an element change, plus `assertCompiles`/`assertDoesNotCompile` for the tag boundary and invariance.
hearnadam
reviewed
May 29, 2026
|
|
||
| /** Flat-maps each element to another stream and concatenates the results. */ | ||
| def flatMap[B](f: A => CStream[B])(using outTag: Tag[Emit[Chunk[B]]], frame: Frame): CStream[B] = | ||
| given tagA: Tag[Emit[Chunk[A]]] = tag |
Collaborator
There was a problem hiding this comment.
Could we make this tag given in the class?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
The streams surface promises portability: code written against
CStreamcompiles unchanged against every binding. The kyo binding silently broke that.kyo.Streamkeys its emit channel by aTag[Emit[Chunk[A]]]re-summoned on every operator, and our inline delegations forwarded that requirement to callers, so generic element-abstract pipelines compiled against ce/zio/future/ox/twitter but not against the binding wrapping Kyo's own stream. Separately, the bindings disagreed on variance (CStream[+A]everywhere, but kyo cannot be covariant; see below), which breaks portability the other way.Solution
Replace the kyo opaque alias with a small invariant
final class CStream[A]that captures theTagonce at construction (whereAis concrete) and threads it into each delegatedkyo.Streamcall. Element-preserving operators (take/drop/filter/run/fold/concat/...) need noTag; element-changing ones (map/mapPure/flatMap/collectPure) take the outputTag[Emit[Chunk[B]]], which derives automatically for concreteB. Make all other bindings invariant too (CStream[A]) so the portable surface agrees on variance.Notes
kyo.Streamis a raw computation that can carry several independent effect channels keyed by element type. The stored tag must stay matched to that carrier, which is why the binding is invariant andlowerneeds no re-tagging and no casts. The other bindings' carriers are covariant, so dropping+is a pure annotation change; all 38 sharedCStreamTestcases still pass on every binding.kyo.Stream(dropping the per-operatorTag) was rejected: it exposes that raw multi-channel emit computation, where the element-typed tag is load-bearing acrossEmit/Poll/Pipe/Sink.CStreamTagFreeTest(kyo binding) covers it: previously-uncompilable generic pipelines, channel isolation across flatMap chains, faithfullowerafter an element change, plusassertCompiles/assertDoesNotCompilefor the tag boundary and invariance.