@@ -52,12 +52,12 @@ trait ReadChannel[F[_], A]:
52
52
* Can be used only inside async block.
53
53
* If stream is closed and no values to read left in the stream - throws StreamClosedException
54
54
**/
55
- transparent inline def read (): A = await(aread())(using rAsyncMonad)
55
+ transparent inline def read ()( using CpsMonadContext [ F ]) : A = await(aread())(using rAsyncMonad)
56
56
57
57
/**
58
58
* Synonim for read.
59
59
*/
60
- transparent inline def ? : A = await(aread())(using rAsyncMonad)
60
+ transparent inline def ? ( using CpsMonadContext [ F ]) : A = await(aread())(using rAsyncMonad)
61
61
62
62
/**
63
63
* return F which contains sequence from first `n` elements.
@@ -83,7 +83,7 @@ trait ReadChannel[F[_], A]:
83
83
* take first `n` elements.
84
84
* should be called inside async block.
85
85
**/
86
- transparent inline def take (n : Int ): IndexedSeq [A ] =
86
+ transparent inline def take (n : Int )( using CpsMonadContext [ F ]) : IndexedSeq [A ] =
87
87
await(atake(n))(using rAsyncMonad)
88
88
89
89
/**
@@ -107,7 +107,7 @@ trait ReadChannel[F[_], A]:
107
107
*
108
108
* should be called inside async block.
109
109
**/
110
- transparent inline def optRead (): Option [A ] = await(aOptRead())(using rAsyncMonad)
110
+ transparent inline def optRead ()( using CpsMonadContext [ F ]) : Option [A ] = await(aOptRead())(using rAsyncMonad)
111
111
112
112
def foreach_async (f : A => F [Unit ]): F [Unit ] =
113
113
given CpsAsyncMonad [F ] = asyncMonad
@@ -130,7 +130,7 @@ trait ReadChannel[F[_], A]:
130
130
* run code each time when new object is arriced.
131
131
* until end of stream is not reached
132
132
**/
133
- transparent inline def foreach (inline f : A => Unit ): Unit =
133
+ transparent inline def foreach (inline f : A => Unit )( using CpsMonadContext [ F ]) : Unit =
134
134
await(aforeach(f))(using rAsyncMonad)
135
135
136
136
@@ -170,8 +170,8 @@ trait ReadChannel[F[_], A]:
170
170
s
171
171
}
172
172
173
- transparent inline def fold [S ](inline s0: S )(inline f : (S ,A ) => S ): S =
174
- await[F ,S ](afold(s0)(f))(using rAsyncMonad)
173
+ transparent inline def fold [S ](inline s0: S )(inline f : (S ,A ) => S )( using mc : CpsMonadContext [ F ]) : S =
174
+ await[F ,S , F ](afold(s0)(f))(using rAsyncMonad, mc )
175
175
176
176
def zip [B ](x : ReadChannel [F ,B ]): ReadChannel [F ,(A ,B )] =
177
177
given CpsSchedulingMonad [F ] = asyncMonad
@@ -317,14 +317,18 @@ object ReadChannel:
317
317
}
318
318
319
319
320
-
320
+
321
321
import cps .stream ._
322
322
323
- given emitAbsorber [F [_]: CpsSchedulingMonad , T ](using gopherApi : Gopher [F ]): BaseUnfoldCpsAsyncEmitAbsorber [ReadChannel [F ,T ],F ,T ](
324
- using gopherApi.asyncMonad, gopherApi.taskExecutionContext ) with
323
+ given emitAbsorber [F [_], C <: CpsMonadContext [ F ], T ](using auxMonad : CpsSchedulingMonad [ F ]{ type Context = C }, gopherApi : Gopher [F ]): BaseUnfoldCpsAsyncEmitAbsorber [ReadChannel [F ,T ],F , C ,T ](
324
+ using gopherApi.taskExecutionContext, auxMonad ) with
325
325
326
326
override type Element = T
327
327
328
+ def asSync (fs : F [ReadChannel [F ,T ]]): ReadChannel [F ,T ] =
329
+ DelayedReadChannel (fs)
330
+
331
+
328
332
def unfold [S ](s0: S )(f : S => F [Option [(T ,S )]]): ReadChannel [F ,T ] =
329
333
val r : ReadChannel [F ,T ] = unfoldAsync(s0)(f)
330
334
r
0 commit comments