diff --git a/build.sbt b/build.sbt index ac2af034..dc5c80a2 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,9 @@ -val releaseV = "2.2.9" +import sbt.Credentials +import sbt.Keys.{credentials, publishConfiguration} -val scalaV = "2.11.12" +val releaseV = "2.2.10_TN" + +val scalaV = "2.12.8" val AkkaV = "2.5.12" //min version to have Serialization.withTransportInformation val MongoJavaDriverVersion = "3.8.2" @@ -39,7 +42,7 @@ ThisBuild / scalaVersion := scalaV val commonSettings = Seq( scalaVersion := scalaV, - crossScalaVersions := Seq("2.11.12", "2.12.8"), + crossScalaVersions := Seq("2.12.8", "2.11.12"), dependencyOverrides += "org.mongodb" % "mongodb-driver" % "3.8.2" , libraryDependencies ++= commonDeps(scalaBinaryVersion.value), dependencyOverrides ++= Seq( @@ -82,40 +85,23 @@ val commonSettings = Seq( testOptions in Test += Tests.Argument("-oDS"), testOptions in Travis += Tests.Argument("-l", "org.scalatest.tags.Slow"), fork in Test := true, - publishTo := sonatypePublishTo.value, + //publishTo := sonatypePublishTo.value, + publishTo := { + if (isSnapshot.value) + Some("Artifactory Realm" at "https://artifactory.linkedstore.com/artifactory/libs-snapshot-local") + else + Some("Artifactory Realm" at "https://artifactory.linkedstore.com/artifactory/libs-release-local") + }, + credentials += Credentials("Artifactory Realm", "artifactory.linkedstore.com", "publisher", "u=87Zc;hyA!5K3_D"), publishConfiguration := publishConfiguration.value.withOverwrite(true), publishLocalConfiguration := publishLocalConfiguration.value.withOverwrite(true) + ) ++ inConfig(Travis)(Defaults.testTasks) lazy val `akka-persistence-mongo-common` = (project in file("common")) .settings(commonSettings:_*) .configs(Travis) -lazy val `akka-persistence-mongo-casbah` = (project in file("casbah")) - .dependsOn(`akka-persistence-mongo-common` % "test->test;compile->compile") - .settings(commonSettings:_*) - .settings( - libraryDependencies ++= Seq( - "org.mongodb" %% "casbah" % "3.1.1" % "compile" - ) - ) - .configs(Travis) - -lazy val `akka-persistence-mongo-scala` = (project in file("scala")) - .dependsOn(`akka-persistence-mongo-common` % "test->test;compile->compile") - .settings(commonSettings:_*) - .settings( - libraryDependencies ++= Seq( - "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" % "compile", - "org.mongodb.scala" %% "mongo-scala-bson" % "2.4.2" % "compile", - "io.netty" % "netty-buffer" % "4.1.17.Final" % "compile", - "io.netty" % "netty-transport" % "4.1.17.Final" % "compile", - "io.netty" % "netty-handler" % "4.1.17.Final" % "compile", - "org.reactivestreams" % "reactive-streams" % "1.0.2" - ) - ) - .configs(Travis) - lazy val `akka-persistence-mongo-rxmongo` = (project in file("rxmongo")) .dependsOn(`akka-persistence-mongo-common` % "test->test;compile->compile") .settings(commonSettings:_*) @@ -130,21 +116,3 @@ lazy val `akka-persistence-mongo-rxmongo` = (project in file("rxmongo")) ) ) .configs(Travis) - -lazy val `akka-persistence-mongo-tools` = (project in file("tools")) - .dependsOn(`akka-persistence-mongo-scala` % "test->test;compile->compile") - .settings(commonSettings:_*) - .settings( - libraryDependencies ++= Seq( - "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" % "compile" - ) - ) - .configs(Travis) - -lazy val `akka-persistence-mongo` = (project in file(".")) - .aggregate(`akka-persistence-mongo-common`, `akka-persistence-mongo-casbah`, `akka-persistence-mongo-rxmongo`, `akka-persistence-mongo-scala`, `akka-persistence-mongo-tools`) - .settings( - crossScalaVersions := Nil, - skip in publish := true, - publishTo := Some(Resolver.file("file", new File("target/unusedrepo"))) - ) diff --git a/casbah/src/main/scala/akka/contrib/persistence/mongodb/CasbahSerializers.scala b/casbah/src/main/scala/akka/contrib/persistence/mongodb/CasbahSerializers.scala index cc9af0bd..a7a9a671 100644 --- a/casbah/src/main/scala/akka/contrib/persistence/mongodb/CasbahSerializers.scala +++ b/casbah/src/main/scala/akka/contrib/persistence/mongodb/CasbahSerializers.scala @@ -78,7 +78,7 @@ class CasbahSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSystem) } implicit object Serializer extends CanSerializeJournal[DBObject] { - override def serializeAtom(atom: Atom): DBObject = { + override def serializeAtom(atom: Atom, realtimeEnablePersistence: Boolean): DBObject = { Option(atom.tags).filter(_.nonEmpty).foldLeft( MongoDBObject( ID -> ObjectId.get(), diff --git a/casbah/src/test/scala/akka/contrib/persistence/mongodb/CasbahPersistenceJournallerSpec.scala b/casbah/src/test/scala/akka/contrib/persistence/mongodb/CasbahPersistenceJournallerSpec.scala index 7fe3201a..def2aa10 100755 --- a/casbah/src/test/scala/akka/contrib/persistence/mongodb/CasbahPersistenceJournallerSpec.scala +++ b/casbah/src/test/scala/akka/contrib/persistence/mongodb/CasbahPersistenceJournallerSpec.scala @@ -84,7 +84,7 @@ class CasbahPersistenceJournallerSpec extends TestKit(ActorSystem("unit-test")) def serialize(ref:ActorRef,casbahSerializers: CasbahSerializers): DBObject ={ val myPayload = Payload[com.mongodb.DBObject](ShardRegionTerminated(ref))(casbahSerializers.serialization,implicitly,casbahSerializers.dt,casbahSerializers.loader) val repr = Atom(pid = "pid", from = 1L, to = 1L, events = ISeq(Event(pid = "pid", sn = 1L, payload = myPayload))) - val serialized = serializeAtom(repr) + val serialized = serializeAtom(repr, realtimeEnablePersistence = true) serialized } @@ -121,7 +121,7 @@ class CasbahPersistenceJournallerSpec extends TestKit(ActorSystem("unit-test")) val repr = Atom(pid = "pid", from = 1L, to = 1L, events = ISeq(Event(pid = "pid", sn = 1L, payload = "TEST"))) - val serialized = serializeAtom(repr) + val serialized = serializeAtom(repr, realtimeEnablePersistence = true) val atom = serialized diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistence.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistence.scala index 0a1e3681..f7308120 100755 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistence.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistence.scala @@ -42,7 +42,7 @@ object MongoPersistenceDriver { } trait CanSerializeJournal[D] { - def serializeAtom(atom: Atom): D + def serializeAtom(atom: Atom, realtimeEnablePersistence: Boolean): D } trait CanDeserializeJournal[D] { @@ -289,5 +289,5 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config) def deserializeJournal(dbo: D)(implicit ev: CanDeserializeJournal[D]): Event = ev.deserializeDocument(dbo) - def serializeJournal(aw: Atom)(implicit ev: CanSerializeJournal[D]): D = ev.serializeAtom(aw) + def serializeJournal(aw: Atom)(implicit ev: CanSerializeJournal[D]): D = ev.serializeAtom(aw, realtimeEnablePersistence) } \ No newline at end of file diff --git a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoSerializers.scala b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoSerializers.scala index b5725566..f9f1a317 100644 --- a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoSerializers.scala +++ b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoSerializers.scala @@ -161,17 +161,17 @@ class RxMongoSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSystem) implicit object JournalSerializer extends CanSerializeJournal[BSONDocument] with JournallingFieldNames { - override def serializeAtom(atom: Atom): BSONDocument = { - Option(atom.tags).filter(_.nonEmpty).foldLeft( - BSONDocument( - ID -> BSONObjectID.generate(), + override def serializeAtom(atom: Atom, realtimeEnablePersistence: Boolean): BSONDocument = { + Option(atom.tags).filter(_.nonEmpty).foldLeft { + val bsonDocument = BSONDocument( PROCESSOR_ID -> atom.pid, FROM -> atom.from, TO -> atom.to, EVENTS -> BSONArray(atom.events.map(serializeEvent)), - VERSION -> 1 - ) - ){ case(d,tags) => d.merge(TAGS -> serializeTags(tags)) } + VERSION -> 1) + if (realtimeEnablePersistence) bsonDocument :~ (ID -> BSONObjectID.generate()) + else bsonDocument + }{ case(d,tags) => d.merge(TAGS -> serializeTags(tags)) } } import Producer._ diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala index ed40fbe0..38580c41 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala @@ -103,7 +103,7 @@ class ScalaDriverSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSys } implicit object Serializer extends CanSerializeJournal[BsonDocument] with DefaultBsonTransformers { - override def serializeAtom(atom: Atom): BsonDocument = { + override def serializeAtom(atom: Atom, realtimeEnablePersistence: Boolean): BsonDocument = { Option(atom.tags).filter(_.nonEmpty).foldLeft( BsonDocument( ID -> BsonObjectId(),