Skip to content

Commit 1a94654

Browse files
committed
Add a layer to simplify local testing
1 parent 960ab6e commit 1a94654

File tree

4 files changed

+129
-10
lines changed

4 files changed

+129
-10
lines changed

core/src/main/scala/com/devsisters/shardcake/interfaces/Pods.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ object Pods {
6060

6161
/**
6262
* A layer that creates a service that does nothing when called.
63-
* Useful for testing ShardManager or when using Sharding.local.
63+
* Useful for testing ShardManager or when we don't need messages being sent.
6464
*/
6565
val noop: ULayer[Pods] =
6666
ZLayer.succeed(new Pods {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package com.devsisters.shardcake
2+
3+
import com.devsisters.shardcake.interfaces.{ Pods, Serialization, Storage }
4+
import com.devsisters.shardcake.interfaces.Pods.BinaryMessage
5+
import zio.{ Promise, Queue, RLayer, Task, ULayer, URLayer, ZIO, ZLayer }
6+
import zio.stream.ZStream
7+
8+
object LocalSharding {
9+
10+
private trait LocalQueue {
11+
def localQueue: Queue[LocalQueueMessage]
12+
}
13+
14+
private sealed trait LocalQueueMessage
15+
private object LocalQueueMessage {
16+
case class SendMessage(request: BinaryMessage, response: Promise[Nothing, Option[Array[Byte]]])
17+
extends LocalQueueMessage
18+
case class SendStream(
19+
request: ZStream[Any, Throwable, BinaryMessage],
20+
response: Promise[Nothing, Option[Array[Byte]]]
21+
) extends LocalQueueMessage
22+
case class SendMessageAndReceiveStream(
23+
request: BinaryMessage,
24+
response: Promise[Nothing, ZStream[Any, Throwable, Array[Byte]]]
25+
) extends LocalQueueMessage
26+
case class SendStreamAndReceiveStream(
27+
request: ZStream[Any, Throwable, BinaryMessage],
28+
response: Promise[Nothing, ZStream[Any, Throwable, Array[Byte]]]
29+
) extends LocalQueueMessage
30+
}
31+
32+
private val localQueue: ULayer[LocalQueue] =
33+
ZLayer(
34+
Queue
35+
.unbounded[LocalQueueMessage]
36+
.map(queue =>
37+
new LocalQueue {
38+
def localQueue: Queue[LocalQueueMessage] = queue
39+
}
40+
)
41+
)
42+
43+
private val localPods: URLayer[LocalQueue, Pods] =
44+
ZLayer {
45+
ZIO.serviceWith[LocalQueue](_.localQueue).map { queue =>
46+
new Pods {
47+
def assignShards(pod: PodAddress, shards: Set[ShardId]): Task[Unit] = ZIO.unit
48+
def unassignShards(pod: PodAddress, shards: Set[ShardId]): Task[Unit] = ZIO.unit
49+
def ping(pod: PodAddress): Task[Unit] = ZIO.unit
50+
51+
def sendMessage(pod: PodAddress, message: BinaryMessage): Task[Option[Array[Byte]]] =
52+
Promise.make[Nothing, Option[Array[Byte]]].flatMap { promise =>
53+
queue.offer(LocalQueueMessage.SendMessage(message, promise)) *> promise.await
54+
}
55+
56+
def sendStream(
57+
pod: PodAddress,
58+
entityId: String,
59+
messages: ZStream[Any, Throwable, BinaryMessage]
60+
): Task[Option[Array[Byte]]] =
61+
Promise.make[Nothing, Option[Array[Byte]]].flatMap { promise =>
62+
queue.offer(LocalQueueMessage.SendStream(messages, promise)).fork *> promise.await
63+
}
64+
65+
def sendMessageAndReceiveStream(
66+
pod: PodAddress,
67+
message: BinaryMessage
68+
): ZStream[Any, Throwable, Array[Byte]] =
69+
ZStream.unwrap {
70+
Promise.make[Nothing, ZStream[Any, Throwable, Array[Byte]]].flatMap { promise =>
71+
queue.offer(LocalQueueMessage.SendMessageAndReceiveStream(message, promise)) *> promise.await
72+
}
73+
}
74+
75+
def sendStreamAndReceiveStream(
76+
pod: PodAddress,
77+
entityId: String,
78+
messages: ZStream[Any, Throwable, BinaryMessage]
79+
): ZStream[Any, Throwable, Array[Byte]] =
80+
ZStream.unwrap {
81+
Promise.make[Nothing, ZStream[Any, Throwable, Array[Byte]]].flatMap { promise =>
82+
queue.offer(LocalQueueMessage.SendStreamAndReceiveStream(messages, promise)).fork *> promise.await
83+
}
84+
}
85+
}
86+
}
87+
}
88+
89+
private val localServer: RLayer[Sharding with LocalQueue, Unit] =
90+
ZLayer.scoped {
91+
for {
92+
sharding <- ZIO.service[Sharding]
93+
queue <- ZIO.serviceWith[LocalQueue](_.localQueue)
94+
_ <- ZStream
95+
.fromQueueWithShutdown(queue)
96+
.runForeach {
97+
case LocalQueueMessage.SendMessage(request, response) =>
98+
sharding.sendToLocalEntity(request).flatMap(response.succeed)
99+
case LocalQueueMessage.SendStream(request, response) =>
100+
sharding.sendStreamToLocalEntity(request).flatMap(response.succeed)
101+
case LocalQueueMessage.SendMessageAndReceiveStream(request, response) =>
102+
response.succeed(sharding.sendToLocalEntityAndReceiveStream(request))
103+
case LocalQueueMessage.SendStreamAndReceiveStream(request, response) =>
104+
response.succeed(sharding.sendStreamToLocalEntityAndReceiveStream(request))
105+
}
106+
.forkScoped
107+
} yield ()
108+
}
109+
110+
/**
111+
* A special layer meant for testing that uses a local queue rather than an external transport.
112+
* This layer will only work in a single JVM and is not suitable for production use.
113+
*/
114+
val live: RLayer[ShardManagerClient with Storage with Serialization with Config, Sharding] =
115+
ZLayer.makeSome[ShardManagerClient with Storage with Serialization with Config, Sharding](
116+
localQueue,
117+
localPods,
118+
localServer,
119+
Sharding.live
120+
)
121+
}

entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala

+4-5
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package com.devsisters.shardcake
22

3-
import com.devsisters.shardcake.interfaces.{ Pods, Serialization, Storage }
4-
import zio.{ Config => _, _ }
3+
import com.devsisters.shardcake.interfaces.{ Serialization, Storage }
54
import zio.test.TestAspect.{ sequential, withLiveClock }
65
import zio.test._
6+
import zio.{ Config => _, _ }
77

88
import scala.util.Success
99

1010
object BroadcastingSpec extends ZIOSpecDefault {
1111

12-
private val config = ZLayer.succeed(Config.default)
12+
private val config = ZLayer.succeed(Config.default.copy(simulateRemotePods = true))
1313

1414
def spec: Spec[TestEnvironment with Scope, Any] =
1515
suite("BroadcastingSpec")(
@@ -28,9 +28,8 @@ object BroadcastingSpec extends ZIOSpecDefault {
2828
}
2929
}
3030
).provideShared(
31-
Sharding.live,
3231
Serialization.javaSerialization,
33-
Pods.noop,
32+
LocalSharding.live,
3433
ShardManagerClient.local,
3534
Storage.memory,
3635
config

entities/src/test/scala/com/devsisters/shardcake/ShardingSpec.scala

+3-4
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package com.devsisters.shardcake
22

33
import com.devsisters.shardcake.CounterActor.CounterMessage._
44
import com.devsisters.shardcake.CounterActor._
5-
import com.devsisters.shardcake.interfaces.{ Pods, Serialization, Storage }
6-
import zio.{ Config => _, _ }
5+
import com.devsisters.shardcake.interfaces.{ Serialization, Storage }
76
import zio.stream.{ SubscriptionRef, ZStream }
87
import zio.test.TestAspect.{ sequential, withLiveClock }
98
import zio.test._
9+
import zio.{ Config => _, _ }
1010

1111
object ShardingSpec extends ZIOSpecDefault {
1212
def spec: Spec[TestEnvironment with Scope, Any] =
@@ -144,9 +144,8 @@ object ShardingSpec extends ZIOSpecDefault {
144144
}
145145
}
146146
).provideShared(
147-
Sharding.live,
148147
Serialization.javaSerialization,
149-
Pods.noop,
148+
LocalSharding.live,
150149
ShardManagerClient.local,
151150
Storage.memory,
152151
ZLayer.succeed(Config.default)

0 commit comments

Comments
 (0)