diff --git a/core/src/main/mima-filters/1.1.x.backwards.excludes/LastKnownSequenceNumber.excludes b/core/src/main/mima-filters/1.1.x.backwards.excludes/LastKnownSequenceNumber.excludes new file mode 100644 index 00000000..03f1fcf8 --- /dev/null +++ b/core/src/main/mima-filters/1.1.x.backwards.excludes/LastKnownSequenceNumber.excludes @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# https://github.com/apache/pekko-persistence-jdbc/pull/267 +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.jdbc.query.dao.ReadJournalDao.lastPersistenceIdSequenceNumber") diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/DefaultReadJournalDao.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/DefaultReadJournalDao.scala index df14153f..33cd949e 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/DefaultReadJournalDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/DefaultReadJournalDao.scala @@ -60,6 +60,9 @@ class DefaultReadJournalDao( override def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] = Source.fromPublisher(db.stream(queries.journalSequenceQuery((offset, limit)).result)) + override def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] = + db.run(queries.lastPersistenceIdSequenceNumberQuery(persistenceId).result) + override def maxJournalSequence(): Future[Long] = db.run(queries.maxJournalSequenceQuery.result) diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalDao.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalDao.scala index 10fe78a2..e55ce88b 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalDao.scala @@ -49,6 +49,14 @@ trait ReadJournalDao extends JournalDaoWithReadMessages { */ def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] + /** + * Returns the last known sequence number for the given `persistenceId`. Empty if the `persistenceId` is unknown. + * + * @param persistenceId The `persistenceId` for which the last known sequence number should be returned. + * @return Some sequence number or None if the `persistenceId` is unknown. + */ + def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] + /** * @return The value of the maximum (ordering) id in the journal */ diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala index 4d2884bb..cbe11d03 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala @@ -29,6 +29,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo val messagesQuery = Compiled(_messagesQuery _) val eventsByTag = Compiled(_eventsByTag _) val journalSequenceQuery = Compiled(_journalSequenceQuery _) + val lastPersistenceIdSequenceNumberQuery = Compiled(_lastPersistenceIdSequenceNumberQuery _) val maxJournalSequenceQuery = Compiled { JournalTable.map(_.ordering).max.getOrElse(0L) } @@ -43,6 +44,12 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo baseTableQuery().join(TagTable).on(_.ordering === _.eventId) } + private def _lastPersistenceIdSequenceNumberQuery(persistenceId: Rep[String]) = + baseTableQuery() + .filter(_.persistenceId === persistenceId) + .map(_.sequenceNumber) + .max + private def _messagesQuery( persistenceId: Rep[String], fromSequenceNr: Rep[Long], diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala index 42a00d08..448390c4 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala @@ -59,6 +59,9 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith .via(serializer.deserializeFlow) } + override def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] = + db.run(queries.lastPersistenceIdSequenceNumberQuery(persistenceId).result) + override def messages( persistenceId: String, fromSequenceNr: Long, diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ReadJournalQueries.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ReadJournalQueries.scala index 7640678f..12b6e24d 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ReadJournalQueries.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ReadJournalQueries.scala @@ -28,6 +28,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo val messagesQuery = Compiled(_messagesQuery _) val eventsByTag = Compiled(_eventsByTag _) val journalSequenceQuery = Compiled(_journalSequenceQuery _) + val lastPersistenceIdSequenceNumberQuery = Compiled(_lastPersistenceIdSequenceNumberQuery _) val maxJournalSequenceQuery = Compiled { JournalTable.map(_.ordering).max.getOrElse(0L) } @@ -38,6 +39,12 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo private def _allPersistenceIdsDistinct(max: ConstColumn[Long]): Query[Rep[String], String, Seq] = baseTableQuery().map(_.persistenceId).distinct.take(max) + private def _lastPersistenceIdSequenceNumberQuery(persistenceId: Rep[String]) = + baseTableQuery() + .filter(_.persistenceId === persistenceId) + .map(_.sequenceNumber) + .max + private def _messagesQuery( persistenceId: Rep[String], fromSequenceNr: Rep[Long], diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/javadsl/JdbcReadJournal.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/javadsl/JdbcReadJournal.scala index 871cfe6a..e2009aef 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/javadsl/JdbcReadJournal.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/javadsl/JdbcReadJournal.scala @@ -15,11 +15,17 @@ package org.apache.pekko.persistence.jdbc.query.javadsl import org.apache.pekko + +import java.util.Optional +import java.util.concurrent.CompletionStage + import pekko.NotUsed import pekko.persistence.jdbc.query.scaladsl.{ JdbcReadJournal => ScalaJdbcReadJournal } import pekko.persistence.query.{ EventEnvelope, Offset } import pekko.persistence.query.javadsl._ import pekko.stream.javadsl.Source +import pekko.util.FutureConverters._ +import pekko.util.OptionConverters._ object JdbcReadJournal { final val Identifier = ScalaJdbcReadJournal.Identifier @@ -132,4 +138,16 @@ class JdbcReadJournal(journal: ScalaJdbcReadJournal) */ override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = journal.eventsByTag(tag, offset).asJava + + /** + * Returns the last known sequence number for the given `persistenceId`. Empty if the `persistenceId` is unknown. + * + * @param persistenceId The `persistenceId` for which the last known sequence number should be returned. + * @return Some sequence number or None if the `persistenceId` is unknown. + */ + def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): CompletionStage[Optional[java.lang.Long]] = + journal + .currentLastKnownSequenceNumberByPersistenceId(persistenceId) + .asJava + .thenApply(_.map(java.lang.Long.valueOf).toJava) } diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala index 633644eb..f55067eb 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala @@ -317,4 +317,13 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = eventsByTag(tag, offset, terminateAfterOffset = None) + + /** + * Returns the last known sequence number for the given `persistenceId`. Empty if the `persistenceId` is unknown. + * + * @param persistenceId The `persistenceId` for which the last known sequence number should be returned. + * @return Some sequence number or None if the `persistenceId` is unknown. + */ + def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): Future[Option[Long]] = + readJournalDao.lastPersistenceIdSequenceNumber(persistenceId) } diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentLastKnownSequenceNumberByPersistenceIdTest.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentLastKnownSequenceNumberByPersistenceIdTest.scala new file mode 100644 index 00000000..47b5a5cc --- /dev/null +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentLastKnownSequenceNumberByPersistenceIdTest.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.jdbc.query + +import org.scalatest.concurrent.ScalaFutures + +abstract class CurrentLastKnownSequenceNumberByPersistenceIdTest(config: String) extends QueryTestSpec(config) + with ScalaFutures { + + it should "return None for unknown persistenceId" in withActorSystem { implicit system => + val journalOps = new ScalaJdbcReadJournalOperations(system) + + journalOps + .currentLastKnownSequenceNumberByPersistenceId("unknown") + .futureValue shouldBe None + } + + it should "return last sequence number for known persistenceId" in withActorSystem { implicit system => + val journalOps = new ScalaJdbcReadJournalOperations(system) + + withTestActors() { (actor1, _, _) => + actor1 ! 1 + actor1 ! 2 + actor1 ! 3 + actor1 ! 4 + + eventually { + journalOps + .currentLastKnownSequenceNumberByPersistenceId("my-1") + .futureValue shouldBe Some(4) + + // Just ensuring that query targets the correct persistenceId. + journalOps + .currentLastKnownSequenceNumberByPersistenceId("my-2") + .futureValue shouldBe None + } + } + } +} + +class H2ScalaCurrentLastKnownSequenceNumberByPersistenceIdTest + extends CurrentLastKnownSequenceNumberByPersistenceIdTest("h2-shared-db-application.conf") + with H2Cleaner diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala index 509edac1..0d3c65bc 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala @@ -110,6 +110,9 @@ class ScalaJdbcReadJournalOperations(readJournal: JdbcReadJournal)(implicit syst tp.within(within)(f(tp)) } + def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): Future[Option[Long]] = + readJournal.currentLastKnownSequenceNumberByPersistenceId(persistenceId) + override def countJournal: Future[Long] = readJournal .currentPersistenceIds() diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala index af3c551c..cd15c57d 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala @@ -54,6 +54,8 @@ class TestProbeReadJournalDao(val probe: TestProbe) extends ReadJournalDao { maxOffset: Long, max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] = ??? + override def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] = ??? + /** * Returns a Source of bytes for a certain persistenceId */ diff --git a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/CurrentLastKnownSequenceNumberByPersistenceIdTest.scala b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/CurrentLastKnownSequenceNumberByPersistenceIdTest.scala new file mode 100644 index 00000000..35eb0eaa --- /dev/null +++ b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/CurrentLastKnownSequenceNumberByPersistenceIdTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.persistence.jdbc.integration + +import org.apache.pekko.persistence.jdbc.query.{ + CurrentLastKnownSequenceNumberByPersistenceIdTest, + MysqlCleaner, + OracleCleaner, + PostgresCleaner, + SqlServerCleaner +} + +// Note: these tests use the shared-db configs, the test for all (so not only current) events use the regular db config + +class PostgresScalaCurrentLastKnownSequenceNumberByPersistenceIdTest + extends CurrentLastKnownSequenceNumberByPersistenceIdTest("postgres-shared-db-application.conf") + with PostgresCleaner + +class MySQLScalaCurrentLastKnownSequenceNumberByPersistenceIdTest + extends CurrentLastKnownSequenceNumberByPersistenceIdTest("mysql-shared-db-application.conf") + with MysqlCleaner + +class OracleScalaCurrentLastKnownSequenceNumberByPersistenceIdTest + extends CurrentLastKnownSequenceNumberByPersistenceIdTest("oracle-shared-db-application.conf") + with OracleCleaner + +class SqlServerScalaCurrentLastKnownSequenceNumberByPersistenceIdTest + extends CurrentLastKnownSequenceNumberByPersistenceIdTest("sqlserver-shared-db-application.conf") + with SqlServerCleaner