From 4fcceb17ba6fcc6d9a812cd09d3b20b40a9f5895 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 19 Apr 2024 10:46:15 +0800 Subject: [PATCH] keep the highest sequence number event --- .../persistence/jdbc/journal/dao/DefaultJournalDao.scala | 4 +++- .../pekko/persistence/jdbc/journal/dao/JournalQueries.scala | 1 + .../jdbc/journal/dao/legacy/ByteArrayJournalDao.scala | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/DefaultJournalDao.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/DefaultJournalDao.scala index 421a123d..bcca6556 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/DefaultJournalDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/DefaultJournalDao.scala @@ -59,7 +59,9 @@ class DefaultJournalDao( new JournalQueries(profile, journalConfig.eventJournalTableConfiguration, journalConfig.eventTagTableConfiguration) override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] = { - db.run(queries.delete(persistenceId, maxSequenceNr)).map(_ => ()) + // We should keep journal record with highest sequence number in order to be compliant + // with @see [[pekko.persistence.journal.JournalSpec]] + db.run(queries.delete(persistenceId, maxSequenceNr - 1)).map(_ => ()) } override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala index 0b19d63b..cb55b4ed 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala @@ -62,6 +62,7 @@ class JournalQueries( private def _selectByPersistenceIdAndMaxSequenceNr(persistenceId: Rep[String], maxSeqNr: Rep[Long]) = _selectByPersistenceId(persistenceId).filter(_.sequenceNumber <= maxSeqNr) + val selectByPersistenceIdAndMaxSequenceNr = Compiled(_selectByPersistenceIdAndMaxSequenceNr _) private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] = diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala index 14a2fd57..5e3ab4df 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala @@ -87,7 +87,7 @@ trait BaseByteArrayJournalDao override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] = { // We should keep journal record with highest sequence number in order to be compliant // with @see [[pekko.persistence.journal.JournalSpec]] - db.run(queries.delete(persistenceId, maxSequenceNr)).map(_ => ()) + db.run(queries.delete(persistenceId, maxSequenceNr - 1)).map(_ => ()) } def update(persistenceId: String, sequenceNr: Long, payload: AnyRef): Future[Done] = {