Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add storage client using Redisson #108

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ val zioCatsInteropVersion = "23.1.0.0"
val sttpVersion = "3.9.1"
val calibanVersion = "2.4.3"
val redis4catsVersion = "1.5.2"
val redissonVersion = "3.23.0"
val scalaKryoVersion = "1.0.2"
val testContainersVersion = "0.40.9"

Expand Down Expand Up @@ -53,6 +54,7 @@ lazy val root = project
entities,
healthK8s,
storageRedis,
storageRedisson,
serializationKryo,
grpcProtocol,
examples
Expand Down Expand Up @@ -126,6 +128,18 @@ lazy val storageRedis = project
)
)

lazy val storageRedisson = project
.in(file("storage-redisson"))
.settings(name := "shardcake-storage-redisson")
.settings(commonSettings)
.dependsOn(core)
.settings(
libraryDependencies ++=
Seq(
"org.redisson" % "redisson" % redissonVersion
)
)

lazy val serializationKryo = project
.in(file("serialization-kryo"))
.settings(name := "shardcake-serialization-kryo")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.devsisters.shardcake

/**
* The configuration for the Redis storage implementation.
* @param assignmentsKey the key to store shard assignments
* @param podsKey the key to store registered pods
*/
case class RedisConfig(assignmentsKey: String, podsKey: String)

object RedisConfig {
val default: RedisConfig = RedisConfig(assignmentsKey = "shard_assignments", podsKey = "pods")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.devsisters.shardcake

import scala.jdk.CollectionConverters._

import com.devsisters.shardcake.interfaces.Storage
import org.redisson.api.RedissonClient
import org.redisson.api.listener.MessageListener
import org.redisson.client.codec.StringCodec
import zio.stream.ZStream
import zio.{ Queue, Task, Unsafe, ZIO, ZLayer }

object StorageRedis {

/**
* A layer that returns a Storage implementation using Redis
*/
val live: ZLayer[RedissonClient with RedisConfig, Nothing, Storage] =
ZLayer {
for {
config <- ZIO.service[RedisConfig]
redisClient <- ZIO.service[RedissonClient]
assignmentsMap = redisClient.getMap[String, String](config.assignmentsKey)
podsMap = redisClient.getMap[String, String](config.podsKey)
assignmentsTopic = redisClient.getTopic(config.assignmentsKey, StringCodec.INSTANCE)
} yield new Storage {
def getAssignments: Task[Map[ShardId, Option[PodAddress]]] =
ZIO
.fromCompletionStage(assignmentsMap.readAllEntrySetAsync())
.map(
_.asScala
.flatMap(entry =>
entry.getKey.toIntOption.map(
_ -> (if (entry.getValue.isEmpty) None
else PodAddress(entry.getValue))
)
)
.toMap
)
def saveAssignments(assignments: Map[ShardId, Option[PodAddress]]): Task[Unit] =
ZIO.fromCompletionStage(assignmentsMap.putAllAsync(assignments.map { case (k, v) =>
k.toString -> v.fold("")(_.toString)
}.asJava)) *>
ZIO.fromCompletionStage(assignmentsTopic.publishAsync("ping")).unit
def assignmentsStream: ZStream[Any, Throwable, Map[ShardId, Option[PodAddress]]] =
ZStream.unwrap {
for {
queue <- Queue.unbounded[String]
runtime <- ZIO.runtime[Any]
_ <- ZIO.fromCompletionStage(
assignmentsTopic.addListenerAsync(
classOf[String],
new MessageListener[String] {
def onMessage(channel: CharSequence, msg: String): Unit =
Unsafe.unsafe(implicit unsafe => runtime.unsafe.run(queue.offer(msg)))
}
)
)
} yield ZStream.fromQueueWithShutdown(queue).mapZIO(_ => getAssignments)
}
def getPods: Task[Map[PodAddress, Pod]] =
ZIO
.fromCompletionStage(podsMap.readAllEntrySetAsync())
.map(
_.asScala
.flatMap(entry => PodAddress(entry.getKey).map(address => address -> Pod(address, entry.getValue)))
.toMap
)
def savePods(pods: Map[PodAddress, Pod]): Task[Unit] =
ZIO.fromCompletionStage(podsMap.putAllAsync(pods.map { case (k, v) => k.toString -> v.version }.asJava)).unit
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.interfaces.Storage
import com.dimafeng.testcontainers.GenericContainer
import org.redisson.Redisson
import org.redisson.config.{ Config => RedissonConfig }
import org.redisson.api.RedissonClient
import zio.Clock.ClockLive
import zio._
import zio.stream.ZStream
import zio.test.TestAspect.sequential
import zio.test._

object StorageRedisSpec extends ZIOSpecDefault {
val container: ZLayer[Any, Nothing, GenericContainer] =
ZLayer.scoped {
ZIO.acquireRelease {
ZIO.attemptBlocking {
val container = new GenericContainer(dockerImage = "redis:6.2.5", exposedPorts = Seq(6379))
container.start()
container
}.orDie
}(container => ZIO.attemptBlocking(container.stop()).orDie)
}

val redis: ZLayer[GenericContainer, Throwable, RedissonClient] =
ZLayer {
for {
container <- ZIO.service[GenericContainer]
uri = s"redis://foobared@${container.host}:${container.mappedPort(container.exposedPorts.head)}"
redissonConfig = new RedissonConfig()
_ = redissonConfig.useSingleServer().setAddress(uri)
client = Redisson.create(redissonConfig)
} yield client
}

def spec: Spec[TestEnvironment with Scope, Any] =
suite("StorageRedisSpec")(
test("save and get pods") {
val expected = List(Pod(PodAddress("host1", 1), "1.0.0"), Pod(PodAddress("host2", 2), "2.0.0"))
.map(p => p.address -> p)
.toMap
for {
_ <- ZIO.serviceWithZIO[Storage](_.savePods(expected))
actual <- ZIO.serviceWithZIO[Storage](_.getPods)
} yield assertTrue(expected == actual)
},
test("save and get assignments") {
val expected = Map(1 -> Some(PodAddress("host1", 1)), 2 -> None)
for {
_ <- ZIO.serviceWithZIO[Storage](_.saveAssignments(expected))
actual <- ZIO.serviceWithZIO[Storage](_.getAssignments)
} yield assertTrue(expected == actual)
},
test("assignments stream") {
val expected = Map(1 -> Some(PodAddress("host1", 1)), 2 -> None)
for {
p <- Promise.make[Nothing, Map[Int, Option[PodAddress]]]
_ <- ZStream.serviceWithStream[Storage](_.assignmentsStream).runForeach(p.succeed(_)).fork
_ <- ClockLive.sleep(1 second)
_ <- ZIO.serviceWithZIO[Storage](_.saveAssignments(expected))
actual <- p.await
} yield assertTrue(expected == actual)
}
).provideLayerShared(
container >>> redis ++ ZLayer.succeed(RedisConfig.default) >>> StorageRedis.live
) @@ sequential
}
2 changes: 1 addition & 1 deletion vuepress/docs/docs/customization.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ trait Storage {

For testing, you can use the `Storage.memory` layer that keeps data in memory.

Shardcake provides an implementation of `Storage` using Redis. To use it, add the following dependency:
Shardcake provides an implementation of `Storage` using Redis with the Redis4cats library (there's also an alternative using Redisson). To use it, add the following dependency:
```scala
libraryDependencies += "com.devsisters" %% "shardcake-storage-redis" % "2.1.0"
```
Expand Down