Skip to content

Commit

Permalink
keep the highest sequence number event
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed Apr 19, 2024
1 parent 2682cbd commit 4fcceb1
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ class DefaultJournalDao(
new JournalQueries(profile, journalConfig.eventJournalTableConfiguration, journalConfig.eventTagTableConfiguration)

override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] = {
db.run(queries.delete(persistenceId, maxSequenceNr)).map(_ => ())
// We should keep journal record with highest sequence number in order to be compliant
// with @see [[pekko.persistence.journal.JournalSpec]]
db.run(queries.delete(persistenceId, maxSequenceNr - 1)).map(_ => ())
}

override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class JournalQueries(

private def _selectByPersistenceIdAndMaxSequenceNr(persistenceId: Rep[String], maxSeqNr: Rep[Long]) =
_selectByPersistenceId(persistenceId).filter(_.sequenceNumber <= maxSeqNr)

val selectByPersistenceIdAndMaxSequenceNr = Compiled(_selectByPersistenceIdAndMaxSequenceNr _)

private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ trait BaseByteArrayJournalDao
override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] = {
// We should keep journal record with highest sequence number in order to be compliant
// with @see [[pekko.persistence.journal.JournalSpec]]
db.run(queries.delete(persistenceId, maxSequenceNr)).map(_ => ())
db.run(queries.delete(persistenceId, maxSequenceNr - 1)).map(_ => ())
}

def update(persistenceId: String, sequenceNr: Long, payload: AnyRef): Future[Done] = {
Expand Down

0 comments on commit 4fcceb1

Please sign in to comment.