Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for querying last known sequence number by persistenceId. #267

Merged
merged 8 commits into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# https://github.com/apache/pekko-persistence-jdbc/pull/267
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.jdbc.query.dao.ReadJournalDao.lastPersistenceIdSequenceNumber")
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class DefaultReadJournalDao(
override def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] =
Source.fromPublisher(db.stream(queries.journalSequenceQuery((offset, limit)).result))

override def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] =
db.run(queries.lastPersistenceIdSequenceNumberQuery(persistenceId).result)

override def maxJournalSequence(): Future[Long] =
db.run(queries.maxJournalSequenceQuery.result)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ trait ReadJournalDao extends JournalDaoWithReadMessages {
*/
def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed]

/**
* Returns the last known sequence number for the given `persistenceId`. Empty if the `persistenceId` is unknown.
*
* @param persistenceId The `persistenceId` for which the last known sequence number should be returned.
* @return Some sequence number or None if the `persistenceId` is unknown.
*/
def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]]

/**
* @return The value of the maximum (ordering) id in the journal
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
val messagesQuery = Compiled(_messagesQuery _)
val eventsByTag = Compiled(_eventsByTag _)
val journalSequenceQuery = Compiled(_journalSequenceQuery _)
val lastPersistenceIdSequenceNumberQuery = Compiled(_lastPersistenceIdSequenceNumberQuery _)
val maxJournalSequenceQuery = Compiled {
JournalTable.map(_.ordering).max.getOrElse(0L)
}
Expand All @@ -43,6 +44,12 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
baseTableQuery().join(TagTable).on(_.ordering === _.eventId)
}

private def _lastPersistenceIdSequenceNumberQuery(persistenceId: Rep[String]) =
baseTableQuery()
.filter(_.persistenceId === persistenceId)
.map(_.sequenceNumber)
.max

private def _messagesQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
.via(serializer.deserializeFlow)
}

override def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] =
db.run(queries.lastPersistenceIdSequenceNumberQuery(persistenceId).result)

override def messages(
persistenceId: String,
fromSequenceNr: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
val messagesQuery = Compiled(_messagesQuery _)
val eventsByTag = Compiled(_eventsByTag _)
val journalSequenceQuery = Compiled(_journalSequenceQuery _)
val lastPersistenceIdSequenceNumberQuery = Compiled(_lastPersistenceIdSequenceNumberQuery _)
val maxJournalSequenceQuery = Compiled {
JournalTable.map(_.ordering).max.getOrElse(0L)
}
Expand All @@ -38,6 +39,12 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
private def _allPersistenceIdsDistinct(max: ConstColumn[Long]): Query[Rep[String], String, Seq] =
baseTableQuery().map(_.persistenceId).distinct.take(max)

private def _lastPersistenceIdSequenceNumberQuery(persistenceId: Rep[String]) =
baseTableQuery()
.filter(_.persistenceId === persistenceId)
.map(_.sequenceNumber)
.max

private def _messagesQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
package org.apache.pekko.persistence.jdbc.query.javadsl

import org.apache.pekko

import java.util.Optional
import java.util.concurrent.CompletionStage

import pekko.NotUsed
import pekko.persistence.jdbc.query.scaladsl.{ JdbcReadJournal => ScalaJdbcReadJournal }
import pekko.persistence.query.{ EventEnvelope, Offset }
import pekko.persistence.query.javadsl._
import pekko.stream.javadsl.Source
import pekko.util.FutureConverters._
import pekko.util.OptionConverters._

object JdbcReadJournal {
final val Identifier = ScalaJdbcReadJournal.Identifier
Expand Down Expand Up @@ -132,4 +138,16 @@ class JdbcReadJournal(journal: ScalaJdbcReadJournal)
*/
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
journal.eventsByTag(tag, offset).asJava

/**
* Returns the last known sequence number for the given `persistenceId`. Empty if the `persistenceId` is unknown.
*
* @param persistenceId The `persistenceId` for which the last known sequence number should be returned.
* @return Some sequence number or None if the `persistenceId` is unknown.
*/
def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): CompletionStage[Optional[java.lang.Long]] =
journal
.currentLastKnownSequenceNumberByPersistenceId(persistenceId)
.asJava
.thenApply(_.map(java.lang.Long.valueOf).toJava)
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,13 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E

def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
eventsByTag(tag, offset, terminateAfterOffset = None)

/**
* Returns the last known sequence number for the given `persistenceId`. Empty if the `persistenceId` is unknown.
*
* @param persistenceId The `persistenceId` for which the last known sequence number should be returned.
* @return Some sequence number or None if the `persistenceId` is unknown.
*/
def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): Future[Option[Long]] =
readJournalDao.lastPersistenceIdSequenceNumber(persistenceId)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.jdbc.query

import org.scalatest.concurrent.ScalaFutures

abstract class CurrentLastKnownSequenceNumberByPersistenceIdTest(config: String) extends QueryTestSpec(config)
with ScalaFutures {

it should "return None for unknown persistenceId" in withActorSystem { implicit system =>
val journalOps = new ScalaJdbcReadJournalOperations(system)

journalOps
.currentLastKnownSequenceNumberByPersistenceId("unknown")
.futureValue shouldBe None
}

it should "return last sequence number for known persistenceId" in withActorSystem { implicit system =>
val journalOps = new ScalaJdbcReadJournalOperations(system)

withTestActors() { (actor1, _, _) =>
actor1 ! 1
actor1 ! 2
actor1 ! 3
actor1 ! 4

eventually {
journalOps
.currentLastKnownSequenceNumberByPersistenceId("my-1")
.futureValue shouldBe Some(4)

// Just ensuring that query targets the correct persistenceId.
journalOps
.currentLastKnownSequenceNumberByPersistenceId("my-2")
.futureValue shouldBe None
}
}
}
}

class H2ScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
extends CurrentLastKnownSequenceNumberByPersistenceIdTest("h2-shared-db-application.conf")
with H2Cleaner
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class ScalaJdbcReadJournalOperations(readJournal: JdbcReadJournal)(implicit syst
tp.within(within)(f(tp))
}

def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): Future[Option[Long]] =
readJournal.currentLastKnownSequenceNumberByPersistenceId(persistenceId)

override def countJournal: Future[Long] =
readJournal
.currentPersistenceIds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class TestProbeReadJournalDao(val probe: TestProbe) extends ReadJournalDao {
maxOffset: Long,
max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] = ???

override def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] = ???

/**
* Returns a Source of bytes for a certain persistenceId
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.persistence.jdbc.integration

import org.apache.pekko.persistence.jdbc.query.{
CurrentLastKnownSequenceNumberByPersistenceIdTest,
MysqlCleaner,
OracleCleaner,
PostgresCleaner,
SqlServerCleaner
}

// Note: these tests use the shared-db configs, the test for all (so not only current) events use the regular db config

class PostgresScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
extends CurrentLastKnownSequenceNumberByPersistenceIdTest("postgres-shared-db-application.conf")
with PostgresCleaner

class MySQLScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
extends CurrentLastKnownSequenceNumberByPersistenceIdTest("mysql-shared-db-application.conf")
with MysqlCleaner

class OracleScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
extends CurrentLastKnownSequenceNumberByPersistenceIdTest("oracle-shared-db-application.conf")
with OracleCleaner

class SqlServerScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
extends CurrentLastKnownSequenceNumberByPersistenceIdTest("sqlserver-shared-db-application.conf")
with SqlServerCleaner
Loading