Skip to content

Commit db9818e

Browse files
authored
Improve Support For Single Threaded Environments (zio#4115)
* initial work * refactor RTSSPec tests to avoid blocking * yield in more combinators * don't block when canceling * don't optimize away zero duration sleeps * fix Dotty type inference issue * reimplement ZSTM#orTry * don't optimize away zero duration sleeps * don't optimize away zero duration sleeps * revert runtime settings * fix flaky test * don't use deprecated methods * use nonFlaky
1 parent 26cff65 commit db9818e

File tree

20 files changed

+106
-115
lines changed

20 files changed

+106
-115
lines changed

.vscode/settings.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,8 @@
44
"unensures",
55
"untrack",
66
"urio"
7-
]
7+
],
8+
"files.watcherExclude": {
9+
"**/target": true
10+
}
811
}

benchmarks/src/main/scala/zio/stm/TMapContentionBenchmarks.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,13 @@ class TMapContentionBenchmarks {
2424

2525
@Setup(Level.Trial)
2626
def setup(): Unit = {
27-
val schedule = Schedule.recurs(repeatedUpdates)
2827
val keysToUpdate = (1 to 100).toList
2928
val data = (1 to 1000).toList.zipWithIndex
3029
val map = unsafeRun(TMap.fromIterable(data).commit)
3130
val ref = ZTRef.unsafeMake(data.toMap)
3231

33-
mapUpdates = ZIO.foreachPar_(keysToUpdate)(i => map.put(i, i).commit.repeat(schedule))
34-
refUpdates = ZIO.foreachPar_(keysToUpdate)(i => ref.update(_.updated(i, i)).commit.repeat(schedule))
32+
mapUpdates = ZIO.foreachPar_(keysToUpdate)(i => map.put(i, i).commit.repeatN(repeatedUpdates))
33+
refUpdates = ZIO.foreachPar_(keysToUpdate)(i => ref.update(_.updated(i, i)).commit.repeatN(repeatedUpdates))
3534
}
3635

3736
@Benchmark

core-tests/jvm/src/test/scala-2.12/zio/StacktracesSpec.scala

+1-5
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,7 @@ object StackTracesSpec extends DefaultRunnableSpec {
129129
assert(cause.traces)(isNonEmpty) &&
130130
assert(cause.traces.head.parentTrace.isEmpty)(isFalse) &&
131131
assert(cause.traces.head.parentTrace.get.parentTrace.isEmpty)(isFalse) &&
132-
assert(cause.traces.head.parentTrace.get.parentTrace.get.parentTrace.isEmpty)(isFalse) &&
133-
assert(cause.traces.head.parentTrace.get.parentTrace.get.parentTrace.get.parentTrace.isEmpty)(isFalse) &&
134-
assert(cause.traces.head.parentTrace.get.parentTrace.get.parentTrace.get.parentTrace.get.parentTrace.isEmpty)(
135-
isTrue
136-
)
132+
assert(cause.traces.head.parentTrace.get.parentTrace.get.parentTrace.isEmpty)(isFalse)
137133
}
138134
},
139135
testM("fiber ancestry example with uploads") {

core-tests/jvm/src/test/scala/zio/CancelableFutureSpecJVM.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import java.util.concurrent.Executors
44

55
import scala.concurrent.ExecutionContext
66

7-
import zio.duration._
87
import zio.internal.Executor
98
import zio.test.Assertion._
109
import zio.test.TestAspect._
@@ -34,6 +33,6 @@ object CancelableFutureSpecJVM extends ZIOBaseSpec {
3433
)
3534
).unsafeRun(tst)
3635
)
37-
} @@ timeout(1.second)
36+
} @@ nonFlaky
3837
) @@ zioTag(future)
3938
}

core-tests/jvm/src/test/scala/zio/RTSSpec.scala

+16-14
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicInteger
66
import zio.clock.Clock
77
import zio.duration._
88
import zio.test.Assertion._
9-
import zio.test.TestAspect.{ jvm, nonFlaky, silent }
9+
import zio.test.TestAspect.{ nonFlaky, silent }
1010
import zio.test._
1111
import zio.test.environment.Live
1212

@@ -35,29 +35,31 @@ object RTSSpec extends ZIOBaseSpec {
3535
testM("blocking IO is effect blocking") {
3636
for {
3737
done <- Ref.make(false)
38-
start <- IO.succeed(internal.OneShot.make[Unit])
39-
fiber <- blocking.effectBlockingInterrupt { start.set(()); Thread.sleep(60L * 60L * 1000L) }
38+
start <- Promise.make[Nothing, Unit]
39+
fiber <- blocking.effectBlockingInterrupt { start.unsafeDone(IO.unit); Thread.sleep(60L * 60L * 1000L) }
4040
.ensuring(done.set(true))
4141
.fork
42-
_ <- IO.succeed(start.get())
42+
_ <- start.await
4343
res <- fiber.interrupt
4444
value <- done.get
4545
} yield assert(res)(isInterrupted) && assert(value)(isTrue)
46-
},
46+
} @@ nonFlaky,
4747
testM("cancelation is guaranteed") {
4848
val io =
4949
for {
50-
release <- zio.Promise.make[Nothing, Int]
51-
latch = internal.OneShot.make[Unit]
52-
async = IO.effectAsyncInterrupt[Nothing, Unit] { _ => latch.set(()); Left(release.succeed(42).unit) }
53-
fiber <- async.fork
54-
_ <- IO.effectTotal(latch.get(1000))
55-
_ <- fiber.interrupt.fork
56-
result <- release.await
50+
release <- Promise.make[Nothing, Int]
51+
latch <- Promise.make[Nothing, Unit]
52+
async = IO.effectAsyncInterrupt[Nothing, Unit] { _ =>
53+
latch.unsafeDone(IO.unit); Left(release.succeed(42).unit)
54+
}
55+
fiber <- async.fork
56+
_ <- latch.await
57+
_ <- fiber.interrupt.fork
58+
result <- release.await
5759
} yield result == 42
5860

5961
assertM(io)(isTrue)
60-
},
62+
} @@ nonFlaky,
6163
testM("Fiber dump looks correct") {
6264
for {
6365
promise <- Promise.make[Nothing, Int]
@@ -93,7 +95,7 @@ object RTSSpec extends ZIOBaseSpec {
9395
} yield (startValue + exitValue) == 42
9496

9597
assertM(io)(isTrue)
96-
} @@ zioTag(interruption) @@ jvm(nonFlaky),
98+
} @@ zioTag(interruption) @@ nonFlaky,
9799
testM("deadlock regression 1") {
98100
import java.util.concurrent.Executors
99101

core-tests/shared/src/test/scala/zio/CancelableFutureSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ object CancelableFutureSpec extends ZIOBaseSpec {
5151
_ <- UIO(f.cancel())
5252
r <- ZIO.fromFuture(_ => f).run
5353
} yield assert(r.succeeded)(isFalse) // not interrupted, as the Future fails when the effect in interrupted.
54-
} @@ timeout(1.second) @@ jvmOnly @@ zioTag(interruption),
54+
} @@ nonFlaky @@ zioTag(interruption),
5555
testM("roundtrip preserves interruptibility") {
5656
for {
5757
start <- Promise.make[Nothing, Unit]

core-tests/shared/src/test/scala/zio/ZIOSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -859,7 +859,7 @@ object ZIOSpec extends ZIOBaseSpec {
859859
assert(result3.dieOption)(isSome(equalTo(boom))) && assert(result3.interrupted)(isTrue)
860860
}
861861
}
862-
}
862+
} @@ nonFlaky
863863
),
864864
suite("forkAs")(
865865
testM("child has specified name") {
@@ -2284,7 +2284,7 @@ object ZIOSpec extends ZIOBaseSpec {
22842284
testM("deep fork/join identity") {
22852285
val n = 20
22862286
assertM(concurrentFib(n))(equalTo(fib(n)))
2287-
},
2287+
} @@ jvmOnly,
22882288
testM("effectAsyncM creation is interruptible") {
22892289
for {
22902290
release <- Promise.make[Nothing, Int]

core-tests/shared/src/test/scala/zio/ZQueueSpec.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,15 @@ object ZQueueSpec extends ZIOBaseSpec {
5050
f <- IO.forkAll(values.map(queue.offer))
5151
_ <- waitForSize(queue, 10)
5252
out <- Ref.make[List[Int]](Nil)
53-
_ <- queue.take.flatMap(i => out.update(i :: _)).repeat(Schedule.recurs(9))
53+
_ <- queue.take.flatMap(i => out.update(i :: _)).repeatN(9)
5454
l <- out.get
5555
_ <- f.join
5656
} yield assert(l.toSet)(equalTo(values.toSet))
5757
},
5858
testM("offers are suspended by back pressure") {
5959
for {
6060
queue <- Queue.bounded[Int](10)
61-
_ <- queue.offer(1).repeat(Schedule.recurs(9))
61+
_ <- queue.offer(1).repeatN(9)
6262
refSuspended <- Ref.make[Boolean](true)
6363
f <- (queue.offer(2) *> refSuspended.set(false)).fork
6464
_ <- waitForSize(queue, 11)
@@ -73,7 +73,7 @@ object ZQueueSpec extends ZIOBaseSpec {
7373
f <- IO.forkAll(values.map(queue.offer))
7474
_ <- waitForSize(queue, 10)
7575
out <- Ref.make[List[Int]](Nil)
76-
_ <- queue.take.flatMap(i => out.update(i :: _)).repeat(Schedule.recurs(9))
76+
_ <- queue.take.flatMap(i => out.update(i :: _)).repeatN(9)
7777
l <- out.get
7878
_ <- f.join
7979
} yield assert(l.toSet)(equalTo(values.toSet))
@@ -270,7 +270,7 @@ object ZQueueSpec extends ZIOBaseSpec {
270270
getter = queue.takeBetween(5, 10)
271271
_ <- getter.race(updater)
272272
count <- counter.get
273-
} yield assert(count > 5)(isTrue)
273+
} yield assert(count >= 5)(isTrue)
274274
}
275275
),
276276
testM("offerAll with takeAll") {

core-tests/shared/src/test/scala/zio/ZScopeSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ object ZScopeSpec extends ZIOBaseSpec {
3434
testM("close can be called multiple times") {
3535
for {
3636
open <- ZScope.make[Unit]
37-
_ <- open.close(()).repeat(Schedule.recurs(10))
37+
_ <- open.close(()).repeatN(10)
3838
value <- open.scope.closed
3939
} yield assert(value)(isTrue)
4040
},

core-tests/shared/src/test/scala/zio/stm/TMapSpec.scala

+6-7
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package zio.stm
1818

1919
import zio.test.Assertion._
2020
import zio.test._
21-
import zio.{ Schedule, URIO, ZIOBaseSpec }
21+
import zio.{ URIO, ZIOBaseSpec }
2222

2323
object TMapSpec extends ZIOBaseSpec {
2424

@@ -281,12 +281,11 @@ object TMapSpec extends ZIOBaseSpec {
281281
},
282282
testM("parallel value transformation") {
283283
for {
284-
tmap <- TMap.make("a" -> 0).commit
285-
policy = Schedule.recurs(999)
286-
tx = tmap.transformValues(_ + 1).commit.repeat(policy)
287-
n = 2
288-
_ <- URIO.collectAllPar_(List.fill(n)(tx))
289-
res <- tmap.get("a").commit
284+
tmap <- TMap.make("a" -> 0).commit
285+
tx = tmap.transformValues(_ + 1).commit.repeatN(999)
286+
n = 2
287+
_ <- URIO.collectAllPar_(List.fill(n)(tx))
288+
res <- tmap.get("a").commit
290289
} yield assert(res)(isSome(equalTo(2000)))
291290
},
292291
testM("transformValuesM") {

core-tests/shared/src/test/scala/zio/stm/ZSTMSpec.scala

+6-6
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package stm
33

44
import zio.duration._
55
import zio.test.Assertion._
6-
import zio.test.TestAspect.{ jvmOnly, nonFlaky }
6+
import zio.test.TestAspect.nonFlaky
77
import zio.test._
88
import zio.test.environment.Live
99

@@ -830,8 +830,8 @@ object ZSTMSpec extends ZIOBaseSpec {
830830
for {
831831
sender <- TRef.makeCommit(50)
832832
receiver <- TRef.makeCommit(0)
833-
toReceiver10 = transfer(receiver, sender, 100).repeat(Schedule.recurs(9))
834-
toSender10 = transfer(sender, receiver, 100).repeat(Schedule.recurs(9))
833+
toReceiver10 = transfer(receiver, sender, 100).repeatN(9)
834+
toSender10 = transfer(sender, receiver, 100).repeatN(9)
835835
f <- toReceiver10.zipPar(toSender10).fork
836836
_ <- sender.update(_ + 50).commit
837837
_ <- f.join
@@ -1103,7 +1103,7 @@ object ZSTMSpec extends ZIOBaseSpec {
11031103
updater = ref.update(_ + 10).commit.forever
11041104
res <- (left <|> right).commit.race(updater)
11051105
} yield assert(res)(equalTo("left"))
1106-
} @@ jvmOnly,
1106+
},
11071107
testM("fails if left fails") {
11081108
val left = STM.fail("left")
11091109
val right = STM.succeed("right")
@@ -1433,7 +1433,7 @@ object ZSTMSpec extends ZIOBaseSpec {
14331433
_ <- tvar.set(v + 1)
14341434
v <- tvar.get
14351435
} yield v)
1436-
.repeat(Schedule.recurs(n) *> Schedule.identity)
1436+
.repeatN(n)
14371437

14381438
def compute3VarN(
14391439
n: Int,
@@ -1450,7 +1450,7 @@ object ZSTMSpec extends ZIOBaseSpec {
14501450
_ <- tvar1.set(v1 - 1)
14511451
_ <- tvar2.set(v2 + 1)
14521452
} yield v3)
1453-
.repeat(Schedule.recurs(n) *> Schedule.identity)
1453+
.repeatN(n)
14541454

14551455
def transfer(receiver: TRef[Int], sender: TRef[Int], much: Int): UIO[Int] =
14561456
STM.atomically {

core/js/src/main/scala/zio/clock/PlatformSpecific.scala

-4
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ private[clock] trait PlatformSpecific {
2929

3030
override def schedule(task: Runnable, duration: Duration): CancelToken = duration match {
3131
case Duration.Infinity => ConstFalse
32-
case Duration.Zero =>
33-
task.run()
34-
35-
ConstFalse
3632
case Duration.Finite(_) =>
3733
var completed = false
3834

core/jvm/src/main/scala/zio/clock/PlatformSpecific.scala

-4
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,6 @@ private[clock] trait PlatformSpecific {
3232

3333
override def schedule(task: Runnable, duration: Duration): CancelToken = duration match {
3434
case Duration.Infinity => ConstFalse
35-
case Duration.Zero =>
36-
task.run()
37-
38-
ConstFalse
3935
case Duration.Finite(_) =>
4036
val future = service.schedule(
4137
new Runnable {

core/native/src/main/scala/zio/clock/PlatformSpecific.scala

-4
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ private[clock] trait PlatformSpecific {
2929

3030
override def schedule(task: Runnable, duration: Duration): CancelToken = duration match {
3131
case Duration.Infinity => ConstFalse
32-
case Duration.Zero =>
33-
task.run()
34-
35-
ConstFalse
3632
case Duration.Finite(_) =>
3733
var completed = false
3834

core/shared/src/main/scala/zio/Runtime.scala

+19-6
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,13 @@ trait Runtime[+R] {
103103
* This method is effectful and should only be invoked at the edges of your program.
104104
*/
105105
final def unsafeRunAsyncCancelable[E, A](zio: => ZIO[R, E, A])(k: Exit[E, A] => Any): Fiber.Id => Exit[E, A] = {
106-
lazy val curZIO = if (Platform.isJS) zio else ZIO.yieldNow *> zio
107-
unsafeRunWith(curZIO)(k)
106+
lazy val curZio = if (Platform.isJS) zio else ZIO.yieldNow *> zio
107+
val canceler = unsafeRunWith(curZio)(k)
108+
fiberId => {
109+
val result = internal.OneShot.make[Exit[E, A]]
110+
canceler(fiberId)(result.set)
111+
result.get()
112+
}
108113
}
109114

110115
/**
@@ -122,9 +127,15 @@ trait Runtime[+R] {
122127
*/
123128
final def unsafeRunToFuture[E <: Throwable, A](zio: ZIO[R, E, A]): CancelableFuture[A] = {
124129
val p: concurrent.Promise[A] = scala.concurrent.Promise[A]()
125-
val canceler = unsafeRunAsyncCancelable(zio)(_.fold(cause => p.failure(cause.squashTraceWith(identity)), p.success))
130+
131+
val canceler = unsafeRunWith(zio)(_.fold(cause => p.failure(cause.squashTraceWith(identity)), p.success))
132+
126133
new CancelableFuture[A](p.future) {
127-
def cancel(): Future[Exit[Throwable, A]] = Future.successful(canceler(Fiber.Id.None))
134+
def cancel(): Future[Exit[Throwable, A]] = {
135+
val p: concurrent.Promise[Exit[Throwable, A]] = scala.concurrent.Promise[Exit[Throwable, A]]()
136+
canceler(Fiber.Id.None)(p.success)
137+
p.future
138+
}
128139
}
129140
}
130141

@@ -163,7 +174,9 @@ trait Runtime[+R] {
163174
*/
164175
def withTracingConfig(config: TracingConfig): Runtime[R] = mapPlatform(_.withTracingConfig(config))
165176

166-
private final def unsafeRunWith[E, A](zio: => ZIO[R, E, A])(k: Exit[E, A] => Any): Fiber.Id => Exit[E, A] = {
177+
private final def unsafeRunWith[E, A](
178+
zio: => ZIO[R, E, A]
179+
)(k: Exit[E, A] => Any): Fiber.Id => (Exit[E, A] => Any) => Unit = {
167180
val InitialInterruptStatus = InterruptStatus.Interruptible
168181

169182
val fiberId = Fiber.newFiberId()
@@ -194,7 +207,7 @@ trait Runtime[+R] {
194207
context.evaluateNow(ZIOFn.recordStackTrace(() => zio)(zio.asInstanceOf[IO[E, A]]))
195208
context.runAsync(k)
196209

197-
fiberId => unsafeRun(context.interruptAs(fiberId))
210+
fiberId => k => unsafeRunAsync(context.interruptAs(fiberId))((exit: Exit[Nothing, Exit[E, A]]) => k(exit.flatten))
198211
}
199212
}
200213

0 commit comments

Comments
 (0)