Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not manually generate Mongo ObjectIds on the client. Let MongoDB to create them. #238

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 15 additions & 47 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
val releaseV = "2.2.9"
import sbt.Credentials
import sbt.Keys.{credentials, publishConfiguration}

val scalaV = "2.11.12"
val releaseV = "2.2.10_TN"

val scalaV = "2.12.8"

val AkkaV = "2.5.12" //min version to have Serialization.withTransportInformation
val MongoJavaDriverVersion = "3.8.2"
@@ -39,7 +42,7 @@ ThisBuild / scalaVersion := scalaV

val commonSettings = Seq(
scalaVersion := scalaV,
crossScalaVersions := Seq("2.11.12", "2.12.8"),
crossScalaVersions := Seq("2.12.8", "2.11.12"),
dependencyOverrides += "org.mongodb" % "mongodb-driver" % "3.8.2" ,
libraryDependencies ++= commonDeps(scalaBinaryVersion.value),
dependencyOverrides ++= Seq(
@@ -82,40 +85,23 @@ val commonSettings = Seq(
testOptions in Test += Tests.Argument("-oDS"),
testOptions in Travis += Tests.Argument("-l", "org.scalatest.tags.Slow"),
fork in Test := true,
publishTo := sonatypePublishTo.value,
//publishTo := sonatypePublishTo.value,
publishTo := {
if (isSnapshot.value)
Some("Artifactory Realm" at "https://artifactory.linkedstore.com/artifactory/libs-snapshot-local")
else
Some("Artifactory Realm" at "https://artifactory.linkedstore.com/artifactory/libs-release-local")
},
credentials += Credentials("Artifactory Realm", "artifactory.linkedstore.com", "publisher", "u=87Zc;hyA!5K3_D"),
publishConfiguration := publishConfiguration.value.withOverwrite(true),
publishLocalConfiguration := publishLocalConfiguration.value.withOverwrite(true)

) ++ inConfig(Travis)(Defaults.testTasks)

lazy val `akka-persistence-mongo-common` = (project in file("common"))
.settings(commonSettings:_*)
.configs(Travis)

lazy val `akka-persistence-mongo-casbah` = (project in file("casbah"))
.dependsOn(`akka-persistence-mongo-common` % "test->test;compile->compile")
.settings(commonSettings:_*)
.settings(
libraryDependencies ++= Seq(
"org.mongodb" %% "casbah" % "3.1.1" % "compile"
)
)
.configs(Travis)

lazy val `akka-persistence-mongo-scala` = (project in file("scala"))
.dependsOn(`akka-persistence-mongo-common` % "test->test;compile->compile")
.settings(commonSettings:_*)
.settings(
libraryDependencies ++= Seq(
"org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" % "compile",
"org.mongodb.scala" %% "mongo-scala-bson" % "2.4.2" % "compile",
"io.netty" % "netty-buffer" % "4.1.17.Final" % "compile",
"io.netty" % "netty-transport" % "4.1.17.Final" % "compile",
"io.netty" % "netty-handler" % "4.1.17.Final" % "compile",
"org.reactivestreams" % "reactive-streams" % "1.0.2"
)
)
.configs(Travis)

lazy val `akka-persistence-mongo-rxmongo` = (project in file("rxmongo"))
.dependsOn(`akka-persistence-mongo-common` % "test->test;compile->compile")
.settings(commonSettings:_*)
@@ -130,21 +116,3 @@ lazy val `akka-persistence-mongo-rxmongo` = (project in file("rxmongo"))
)
)
.configs(Travis)

lazy val `akka-persistence-mongo-tools` = (project in file("tools"))
.dependsOn(`akka-persistence-mongo-scala` % "test->test;compile->compile")
.settings(commonSettings:_*)
.settings(
libraryDependencies ++= Seq(
"org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" % "compile"
)
)
.configs(Travis)

lazy val `akka-persistence-mongo` = (project in file("."))
.aggregate(`akka-persistence-mongo-common`, `akka-persistence-mongo-casbah`, `akka-persistence-mongo-rxmongo`, `akka-persistence-mongo-scala`, `akka-persistence-mongo-tools`)
.settings(
crossScalaVersions := Nil,
skip in publish := true,
publishTo := Some(Resolver.file("file", new File("target/unusedrepo")))
)
Original file line number Diff line number Diff line change
@@ -78,7 +78,7 @@ class CasbahSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSystem)
}

implicit object Serializer extends CanSerializeJournal[DBObject] {
override def serializeAtom(atom: Atom): DBObject = {
override def serializeAtom(atom: Atom, realtimeEnablePersistence: Boolean): DBObject = {
Option(atom.tags).filter(_.nonEmpty).foldLeft(
MongoDBObject(
ID -> ObjectId.get(),
Original file line number Diff line number Diff line change
@@ -84,7 +84,7 @@ class CasbahPersistenceJournallerSpec extends TestKit(ActorSystem("unit-test"))
def serialize(ref:ActorRef,casbahSerializers: CasbahSerializers): DBObject ={
val myPayload = Payload[com.mongodb.DBObject](ShardRegionTerminated(ref))(casbahSerializers.serialization,implicitly,casbahSerializers.dt,casbahSerializers.loader)
val repr = Atom(pid = "pid", from = 1L, to = 1L, events = ISeq(Event(pid = "pid", sn = 1L, payload = myPayload)))
val serialized = serializeAtom(repr)
val serialized = serializeAtom(repr, realtimeEnablePersistence = true)
serialized
}

@@ -121,7 +121,7 @@ class CasbahPersistenceJournallerSpec extends TestKit(ActorSystem("unit-test"))

val repr = Atom(pid = "pid", from = 1L, to = 1L, events = ISeq(Event(pid = "pid", sn = 1L, payload = "TEST")))

val serialized = serializeAtom(repr)
val serialized = serializeAtom(repr, realtimeEnablePersistence = true)

val atom = serialized

Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ object MongoPersistenceDriver {
}

trait CanSerializeJournal[D] {
def serializeAtom(atom: Atom): D
def serializeAtom(atom: Atom, realtimeEnablePersistence: Boolean): D
}

trait CanDeserializeJournal[D] {
@@ -289,5 +289,5 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config)

def deserializeJournal(dbo: D)(implicit ev: CanDeserializeJournal[D]): Event = ev.deserializeDocument(dbo)

def serializeJournal(aw: Atom)(implicit ev: CanSerializeJournal[D]): D = ev.serializeAtom(aw)
def serializeJournal(aw: Atom)(implicit ev: CanSerializeJournal[D]): D = ev.serializeAtom(aw, realtimeEnablePersistence)
}
Original file line number Diff line number Diff line change
@@ -161,17 +161,17 @@ class RxMongoSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSystem)

implicit object JournalSerializer extends CanSerializeJournal[BSONDocument] with JournallingFieldNames {

override def serializeAtom(atom: Atom): BSONDocument = {
Option(atom.tags).filter(_.nonEmpty).foldLeft(
BSONDocument(
ID -> BSONObjectID.generate(),
override def serializeAtom(atom: Atom, realtimeEnablePersistence: Boolean): BSONDocument = {
Option(atom.tags).filter(_.nonEmpty).foldLeft {
val bsonDocument = BSONDocument(
PROCESSOR_ID -> atom.pid,
FROM -> atom.from,
TO -> atom.to,
EVENTS -> BSONArray(atom.events.map(serializeEvent)),
VERSION -> 1
)
){ case(d,tags) => d.merge(TAGS -> serializeTags(tags)) }
VERSION -> 1)
if (realtimeEnablePersistence) bsonDocument :~ (ID -> BSONObjectID.generate())
else bsonDocument
}{ case(d,tags) => d.merge(TAGS -> serializeTags(tags)) }
}

import Producer._
Original file line number Diff line number Diff line change
@@ -103,7 +103,7 @@ class ScalaDriverSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSys
}

implicit object Serializer extends CanSerializeJournal[BsonDocument] with DefaultBsonTransformers {
override def serializeAtom(atom: Atom): BsonDocument = {
override def serializeAtom(atom: Atom, realtimeEnablePersistence: Boolean): BsonDocument = {
Option(atom.tags).filter(_.nonEmpty).foldLeft(
BsonDocument(
ID -> BsonObjectId(),