Skip to content
This repository has been archived by the owner on Aug 11, 2023. It is now read-only.

Commit

Permalink
Added caching for akka pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
vitaliihonta committed Feb 6, 2019
1 parent 4b49cd6 commit 59f622a
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 10 deletions.
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,19 @@ libraryDependencies += "ua.pp.itkpi" %% "trembita-java-streams" % trembitaV

See [sources](integrations/java/streams) and [tests](integrations/java/streams/src/test/scala/trembita/jstreams) for examples

## Seamless akka infinispan integration
```scala
libraryDependencies += "ua.pp.itkpi" %% "trembita-seamless-akka-infinispan" % trembitaV
```
Allows to cache akka stream.
See [example](integrations/seamless/akka-infinispan/src/test/scala/trembita.seamless.akka_infinispan/CachingSpec.scala)


## To be done
- [x] caching
- [x] integration with distributed streaming frameworks
- [ ] tensorflow
- [ ] slick (in progress)
- [ ] akka http output
- [ ] slick

## Additional information
My speec about trembita at Scalaua conference:
Expand Down
37 changes: 29 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ lazy val trembita_spark =
}
)

lazy val trembita_akka_streamns =
lazy val trembita_akka_streams =
sonatypeProject(
id = "trembita-akka-streams",
base = file("./integrations/akka/streams")
Expand All @@ -138,7 +138,7 @@ lazy val seamless_akka_spark =
sonatypeProject(
id = "trembita-seamless-akka-spark",
base = file("./integrations/seamless/akka-spark")
).dependsOn(kernel, trembita_akka_streamns, trembita_spark)
).dependsOn(kernel, trembita_akka_streams, trembita_spark)
.settings(
name := "trembita-seamless-akka-spark",
version := v,
Expand Down Expand Up @@ -205,6 +205,24 @@ lazy val trembita_java_streams =
libraryDependencies += ScalaCompat.java8compat
)

lazy val seamless_akka_infinispan =
sonatypeProject(id = "trembita-seamless-akka-infinispan", base = file("./integrations/seamless/akka-infinispan"))
.dependsOn(kernel, trembita_akka_streams, trembita_caching_infinispan)
.settings(
name := "trembita-seamless-akka-infinispan",
version := v,
scalacOptions ++= Seq(
"-Ypartial-unification",
"-language:experimental.macros",
"-target:jvm-1.8"
),
libraryDependencies ++= Seq(
ScalaCompat.java8compat,
Akka.testkit,
Testing.mockito % "test"
)
)

lazy val bench = Project(id = "trembita-bench", base = file("./bench"))
.enablePlugins(JmhPlugin)
.dependsOn(
Expand All @@ -214,12 +232,13 @@ lazy val bench = Project(id = "trembita-bench", base = file("./bench"))
cassandra_connector,
cassandra_connector_phantom,
trembita_spark,
trembita_akka_streamns,
trembita_akka_streams,
seamless_akka_spark,
trembita_spark_streaming,
trembita_caching,
trembita_caching_infinispan,
trembita_java_streams
trembita_java_streams,
seamless_akka_infinispan
)
.settings(
name := "trembita-bench",
Expand Down Expand Up @@ -252,12 +271,13 @@ lazy val examples = Project(id = "trembita-examples", base = file("./examples"))
cassandra_connector,
cassandra_connector_phantom,
trembita_spark,
trembita_akka_streamns,
trembita_akka_streams,
seamless_akka_spark,
trembita_spark_streaming,
trembita_caching,
trembita_caching_infinispan,
trembita_java_streams
trembita_java_streams,
seamless_akka_infinispan
)
.settings(
name := "trembita-examples",
Expand Down Expand Up @@ -315,14 +335,15 @@ lazy val root = Project(id = "trembita", base = file("."))
cassandra_connector,
cassandra_connector_phantom,
trembita_spark,
trembita_akka_streamns,
trembita_akka_streams,
seamless_akka_spark,
trembita_spark_streaming,
trembita_caching,
trembita_caching_infinispan,
log4j,
logging_commons,
trembita_java_streams
trembita_java_streams,
seamless_akka_infinispan
)
.settings(
name := "trembita",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package trembita.seamless.akka_infinispan

import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.stage._
import cats.Monad
import cats.effect.Sync
import org.infinispan.commons.api.BasicCache
import trembita.Environment
import trembita.akka_streams.Akka
import trembita.caching._
import trembita.caching.infinispan._
import trembita.util.LiftFuture
import cats.implicits._
import scala.concurrent.ExecutionContext
import scala.language.higherKinds
import scala.reflect.ClassTag

class InfinispanAkkaCaching[F[_], Mat, A: ClassTag](
cache: BasicCache[String, (Vector[A], Mat)],
expirationTimeout: ExpirationTimeout,
F: Sync[F],
liftFuture: LiftFuture[F]
)(implicit fCtg: ClassTag[F[_]], mat: Materializer, ec: ExecutionContext)
extends Caching[F, Akka[Mat], A] {
protected implicit def monad: Monad[F] = F

protected def timeout: ExpirationTimeout = expirationTimeout

protected def cacheRepr(cacheName: String, repr: Source[A, Mat]): F[Unit] =
F.suspend {
liftFuture {
val (matValue, futureVs) = repr
.toMat(Sink.collection[A, Vector[A]])(Keep.both)
.run()
futureVs.map(_ -> matValue)
}.flatMap {
case (vs, matValue) =>
liftFuture {
cache.putIfAbsentAsync(cacheName, vs -> matValue, expirationTimeout.duration.length, expirationTimeout.duration.unit).toScalaOpt
}
}.void
}

protected def getFromCache(cacheName: String): F[Option[Source[A, Mat]]] =
F.suspend {
liftFuture {
cache.getAsync(cacheName).toScalaOpt
}
}
.map(_.map { case (vs, matValue) => Source(vs).mapMaterializedValue(_ => matValue) })

def stop(): F[Unit] = F.delay(cache.stop())

}

object InfinispanAkkaCaching {
def apply[F[_]: Sync: LiftFuture, Mat: ClassTag, A: ClassTag](
cacheF: F[BasicCache[String, (Vector[A], Mat)]],
expirationTimeout: ExpirationTimeout
)(implicit fCtg: ClassTag[F[_]], mat: Materializer, ec: ExecutionContext): F[Caching[F, Akka[Mat], A]] =
cacheF.flatMap { cache =>
Sync[F].delay(cache.start()) *>
(new InfinispanAkkaCaching[F, Mat, A](cache, expirationTimeout, Sync[F], LiftFuture[F]): Caching[F, Akka[Mat], A])
.pure[F]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package trembita.seamless.akka_infinispan

import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.function.Supplier

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import cats.effect.IO
import org.infinispan.commons.api.BasicCache
import org.infinispan.configuration.cache.{Configuration, ConfigurationBuilder}
import org.infinispan.manager.DefaultCacheManager
import org.scalatest.{BeforeAndAfterAll, FlatSpec, FlatSpecLike}

import scala.concurrent.duration._
import trembita._
import trembita.akka_streams._
import trembita.caching._
import akka.testkit.TestKit

import scala.concurrent.ExecutionContext
import org.mockito.Mockito._

class CachingSpec extends TestKit(ActorSystem("trembita-akka-pause")) with FlatSpecLike with BeforeAndAfterAll {
implicit val _system: ActorSystem = system
implicit val mat: ActorMaterializer = ActorMaterializer()(system)
implicit val parallelism: Parallelism = Parallelism(4, ordered = false)
implicit val ec: ExecutionContext = system.dispatcher

private val configuration: Configuration = new ConfigurationBuilder().memory().build()
private val cacheManager = new DefaultCacheManager(configuration)

private val `1 second` = ExpirationTimeout(1.second)
private val `5 seconds` = ExpirationTimeout(5.seconds)

override def beforeAll(): Unit = cacheManager.start()
override def afterAll(): Unit = {
cacheManager.stop()
_system.terminate()
}

"infinispan caching" should "cache values of sequential pipeline" in {
implicit val caching: Caching[IO, Akka[NotUsed], Int] =
InfinispanAkkaCaching[IO, NotUsed, Int](
IO(cacheManager.getCache[String, (Vector[Int], NotUsed)]("test-1")),
`5 seconds`
).unsafeRunSync()

val expected = (1 to 20).map(_ + 1).toVector
val pipeline = Input.fromSourceF[IO](Source(1 to 20))

val resultPipeline = pipeline
.map { i =>
i + 1
}
.cached("numbers")

val result = resultPipeline.into(Output.vector).run.unsafeRunSync()
assert(result == expected)

val result2 = resultPipeline.into(Output.vector).run.unsafeRunSync()
assert(result2 == expected)
}

it should "evaluate pipeline before caching" in {
val mockCache = mock(classOf[BasicCache[String, (Vector[Int], NotUsed)]])
when(mockCache.getAsync("numbers")).thenReturn(CompletableFuture.supplyAsync(new Supplier[(Vector[Int], NotUsed)] {
override def get(): (Vector[Int], NotUsed) = null
}))

val sourceSeq = (1 to 20).toVector
val expected = (1 to 20).map(_ + 1).toVector

when(mockCache.putIfAbsentAsync("numbers", (sourceSeq, NotUsed)))
.thenReturn(CompletableFuture.completedFuture[(Vector[Int], NotUsed)]((expected, NotUsed)))

implicit val caching: Caching[IO, Akka[NotUsed], Int] =
InfinispanAkkaCaching[IO, NotUsed, Int](
IO(mockCache),
`5 seconds`
).unsafeRunSync()

val pipeline = Input.fromSourceF[IO](Source(sourceSeq))

val resultPipeline = pipeline
.map { i =>
i + 1
}
.cached("numbers")

val result = try resultPipeline.into(Output.vector).run.unsafeRunSync()
catch {
case e: Throwable =>
println(e)
e.printStackTrace()
throw e
}
assert(result == expected)

verify(mockCache, times(1)).start()
verify(mockCache, times(1)).putIfAbsentAsync("numbers", (expected, NotUsed), 5, TimeUnit.SECONDS)
verify(mockCache, times(1)).getAsync("numbers")
verifyNoMoreInteractions(mockCache)
}

it should "not evaluate pipeline if it was cached" in {
val mockCache = mock(classOf[BasicCache[String, (Vector[Int], NotUsed)]])

val sourceSeq = (1 to 20).toVector
val expected = (1 to 20).map(_ + 1).toVector

when(mockCache.getAsync("numbers")).thenReturn(CompletableFuture.completedFuture[(Vector[Int], NotUsed)]((expected, NotUsed)))

implicit val caching: Caching[IO, Akka[NotUsed], Int] =
InfinispanAkkaCaching[IO, NotUsed, Int](
IO(mockCache),
`5 seconds`
).unsafeRunSync()

val pipeline = Input.fromSourceF[IO](Source(sourceSeq))

val resultPipeline = pipeline
.map { i =>
i + 1
}
.cached("numbers")

val result = resultPipeline.into(Output.vector).run.unsafeRunSync()
assert(result == expected)

verify(mockCache, times(1)).start()
verify(mockCache, times(1)).getAsync("numbers")
verifyNoMoreInteractions(mockCache)
}
}

0 comments on commit 59f622a

Please sign in to comment.