Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for querying last known sequence number by persistenceId. #267

Merged
merged 8 commits into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class DefaultReadJournalDao(
override def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] =
Source.fromPublisher(db.stream(queries.journalSequenceQuery((offset, limit)).result))

override def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] =
db.run(queries.lastPersistenceIdSequenceNumberQuery(persistenceId).result)

override def maxJournalSequence(): Future[Long] =
db.run(queries.maxJournalSequenceQuery.result)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ trait ReadJournalDao extends JournalDaoWithReadMessages {
*/
def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed]

/**
* Returns the last known sequence number for the given `persistenceId`. Empty if the `persistenceId` is unknown.
*
* @param persistenceId The `persistenceId` for which the last known sequence number should be returned.
* @return Some sequence number or None if the `persistenceId` is unknown.
*/
def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]]

/**
* @return The value of the maximum (ordering) id in the journal
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
val messagesQuery = Compiled(_messagesQuery _)
val eventsByTag = Compiled(_eventsByTag _)
val journalSequenceQuery = Compiled(_journalSequenceQuery _)
val lastPersistenceIdSequenceNumberQuery = Compiled(_lastPersistenceIdSequenceNumberQuery _)
val maxJournalSequenceQuery = Compiled {
JournalTable.map(_.ordering).max.getOrElse(0L)
}
Expand All @@ -43,6 +44,12 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
baseTableQuery().join(TagTable).on(_.ordering === _.eventId)
}

private def _lastPersistenceIdSequenceNumberQuery(persistenceId: Rep[String]) =
baseTableQuery()
.filter(_.persistenceId === persistenceId)
.map(_.sequenceNumber)
.max

private def _messagesQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
.via(serializer.deserializeFlow)
}

override def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] =
db.run(queries.lastPersistenceIdSequenceNumberQuery(persistenceId).result)

override def messages(
persistenceId: String,
fromSequenceNr: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
val messagesQuery = Compiled(_messagesQuery _)
val eventsByTag = Compiled(_eventsByTag _)
val journalSequenceQuery = Compiled(_journalSequenceQuery _)
val lastPersistenceIdSequenceNumberQuery = Compiled(_lastPersistenceIdSequenceNumberQuery _)
val maxJournalSequenceQuery = Compiled {
JournalTable.map(_.ordering).max.getOrElse(0L)
}
Expand All @@ -38,6 +39,12 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
private def _allPersistenceIdsDistinct(max: ConstColumn[Long]): Query[Rep[String], String, Seq] =
baseTableQuery().map(_.persistenceId).distinct.take(max)

private def _lastPersistenceIdSequenceNumberQuery(persistenceId: Rep[String]) =
baseTableQuery()
.filter(_.persistenceId === persistenceId)
.map(_.sequenceNumber)
.max

private def _messagesQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the license header on new files that are not copies of other files to be https://github.com/apache/pekko/blob/main/project/AddMetaInfLicenseFiles.scala#L1-L16 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Updated in b033218.

* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.persistence.jdbc.query.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

/**
* A trait that enables querying the current last known sequence number for a given `persistenceId`.
*/
trait CurrentLastKnownSequenceNumberByPersistenceIdQuery {

/**
* Returns the last known sequence number for the given `persistenceId`. Empty if the `persistenceId` is unknown.
*
* @param persistenceId The `persistenceId` for which the last known sequence number should be returned.
* @return Some sequence number or None if the `persistenceId` is unknown.
*/
def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): CompletionStage[Optional[Long]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
package org.apache.pekko.persistence.jdbc.query.javadsl

import org.apache.pekko

import java.util.Optional
import java.util.concurrent.CompletionStage

import pekko.NotUsed
import pekko.persistence.jdbc.query.scaladsl.{ JdbcReadJournal => ScalaJdbcReadJournal }
import pekko.persistence.query.{ EventEnvelope, Offset }
import pekko.persistence.query.javadsl._
import pekko.stream.javadsl.Source
import pekko.util.FutureConverters._
import pekko.util.OptionConverters._

object JdbcReadJournal {
final val Identifier = ScalaJdbcReadJournal.Identifier
Expand All @@ -32,7 +38,8 @@ class JdbcReadJournal(journal: ScalaJdbcReadJournal)
with CurrentEventsByPersistenceIdQuery
with EventsByPersistenceIdQuery
with CurrentEventsByTagQuery
with EventsByTagQuery {
with EventsByTagQuery
with CurrentLastKnownSequenceNumberByPersistenceIdQuery {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need the CurrentLastKnownSequenceNumberByPersistenceIdQuery trait? Can't we just add the function without having a marker trait.
We can add the trait to the pekko-persistence jar at some stage and then do another PR to uptake the trait here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming a marker trait would allow writing this:

PersistenceQuery(system).readJournalFor[CurrentLastKnownSequenceNumberByPersistenceIdQuery](JdbcReadJournal.Identifier)

Instead of having to write this:

PersistenceQuery(system).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier)

But if the goal is to eventually have the marker trait in the core repository then this serves very little and temporary purpose, so overall I agree with suggestion to remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main intent was to stick to existing marker traits application + having that part of contract testable on its own. In our applications we rarely declare dependencies to the full query journal, usually we tend to have the capabilities speaking:

object FancyAggregator {
  def apply(readJournal: EventsByTagQuery)
}

vs

object FancyAggregator {
  def apply(readJournal: JdbcReadJournal)
}

If you feel more comfortable, surely, I can try to inline it without marker trait. Would make dependency injection a bit harder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My aim would be merge this without the marker trait meaning that we could release a version pekko-persistence-jdbc 1.1.x that supports this but without needing a new Pekko core release. pekko-persistence 1.2.0-M2 and above could have the marker trait and a pekko-persistence-jdbc 1.2.0(-M1) release could rely on pekko-persistence 1.2.0-M2 and uptake the marker trait.

I haven't studied pekko-persistence-r2dbc but I presume that it too could implement this support and in a similar timeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will drop the marker trait today or tomorrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed marker trait with ce15d5e.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ptrdom it looks like pekko-persistence-r2dbc can have a similar PR added at some stage? Can you confirm this? I can look into adding the marker trait to the pekko core but it could be a while before it makes it into a full release meaning that we will have to wait to add the uptake of the marker trait.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it should be no problem to add this functionality to pekko-persistence-r2dbc.


/**
* Same type of query as `persistenceIds` but the event stream
Expand Down Expand Up @@ -132,4 +139,10 @@ class JdbcReadJournal(journal: ScalaJdbcReadJournal)
*/
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
journal.eventsByTag(tag, offset).asJava

override def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): CompletionStage[Optional[Long]] =
journal
.currentLastKnownSequenceNumberByPersistenceId(persistenceId)
.asJava
.thenApply(_.toJava)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.persistence.jdbc.query.scaladsl

import scala.concurrent.Future

/**
* A trait that enables querying the current last known sequence number for a given `persistenceId`.
*/
trait CurrentLastKnownSequenceNumberByPersistenceIdQuery {

/**
* Returns the last known sequence number for the given `persistenceId`. Empty if the `persistenceId` is unknown.
*
* @param persistenceId The `persistenceId` for which the last known sequence number should be returned.
* @return Some sequence number or None if the `persistenceId` is unknown.
*/
def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): Future[Option[Long]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E
with CurrentEventsByPersistenceIdQuery
with EventsByPersistenceIdQuery
with CurrentEventsByTagQuery
with EventsByTagQuery {
with EventsByTagQuery
with CurrentLastKnownSequenceNumberByPersistenceIdQuery {

PluginVersionChecker.check()

Expand Down Expand Up @@ -317,4 +318,7 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E

def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
eventsByTag(tag, offset, terminateAfterOffset = None)

override def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): Future[Option[Long]] =
readJournalDao.lastPersistenceIdSequenceNumber(persistenceId)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.persistence.jdbc.query

import org.scalatest.concurrent.ScalaFutures

abstract class CurrentLastKnownSequenceNumberByPersistenceIdTest(
config: String
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this style is still wrong

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hope f0cc6c5 does the job?

) extends QueryTestSpec(config) with ScalaFutures {

it should "return None for unknown persistenceId" in withActorSystem { implicit system =>
val journalOps = new ScalaJdbcReadJournalOperations(system)

journalOps
.currentLastKnownSequenceNumberByPersistenceId("unknown")
.futureValue shouldBe None
}

it should "return last sequence number for known persistenceId" in withActorSystem { implicit system =>
val journalOps = new ScalaJdbcReadJournalOperations(system)

withTestActors() { (actor1, _, _) =>
actor1 ! 1
actor1 ! 2
actor1 ! 3
actor1 ! 4

eventually {
journalOps
.currentLastKnownSequenceNumberByPersistenceId("my-1")
.futureValue shouldBe Some(4)

// Just ensuring that query targets the correct persistenceId.
journalOps
.currentLastKnownSequenceNumberByPersistenceId("my-2")
.futureValue shouldBe None
}
}
}
}

class H2ScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
extends CurrentLastKnownSequenceNumberByPersistenceIdTest("h2-shared-db-application.conf")
with H2Cleaner
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class ScalaJdbcReadJournalOperations(readJournal: JdbcReadJournal)(implicit syst
tp.within(within)(f(tp))
}

def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): Future[Option[Long]] =
readJournal.currentLastKnownSequenceNumberByPersistenceId(persistenceId)

override def countJournal: Future[Long] =
readJournal
.currentPersistenceIds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class TestProbeReadJournalDao(val probe: TestProbe) extends ReadJournalDao {
maxOffset: Long,
max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] = ???

override def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] = ???

/**
* Returns a Source of bytes for a certain persistenceId
*/
Expand Down
Loading