From 689038f78fd61e62d9183304faf9581aaaae4916 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 19 Apr 2024 11:17:51 +0800 Subject: [PATCH] optimized query --- .../jdbc/journal/dao/JournalQueries.scala | 20 ++----------------- .../journal/dao/legacy/JournalQueries.scala | 19 ++---------------- 2 files changed, 4 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala index cb55b4ed..dab31870 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala @@ -51,30 +51,14 @@ class JournalQueries( } def delete(persistenceId: String, toSequenceNr: Long) = { - selectByPersistenceIdAndMaxSequenceNr(persistenceId, toSequenceNr).delete + JournalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete } - private def _selectAllJournalForPersistenceId(persistenceId: Rep[String]) = - _selectByPersistenceId(persistenceId).sortBy(_.sequenceNumber.desc) - - private def _selectByPersistenceId(persistenceId: Rep[String]) = - JournalTable.filter(_.persistenceId === persistenceId) - - private def _selectByPersistenceIdAndMaxSequenceNr(persistenceId: Rep[String], maxSeqNr: Rep[Long]) = - _selectByPersistenceId(persistenceId).filter(_.sequenceNumber <= maxSeqNr) - - val selectByPersistenceIdAndMaxSequenceNr = Compiled(_selectByPersistenceIdAndMaxSequenceNr _) - private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] = - _selectAllJournalForPersistenceId(persistenceId).take(1).map(_.sequenceNumber).max + JournalTable.filter(_.persistenceId === persistenceId).take(1).map(_.sequenceNumber).max val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _) - private def _allPersistenceIdsDistinct: Query[Rep[String], String, Seq] = - JournalTable.map(_.persistenceId).distinct - - val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct) - private def _messagesQuery( persistenceId: Rep[String], fromSequenceNr: Rep[Long], diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/JournalQueries.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/JournalQueries.scala index e895cfde..3ae90968 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/JournalQueries.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/legacy/JournalQueries.scala @@ -27,11 +27,8 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Leg def writeJournalRows(xs: Seq[JournalRow]) = JournalTableC ++= xs.sortBy(_.sequenceNumber) - private def selectAllJournalForPersistenceId(persistenceId: Rep[String]) = - _selectByPersistenceId(persistenceId).sortBy(_.sequenceNumber.desc) - def delete(persistenceId: String, toSequenceNr: Long) = { - selectByPersistenceIdAndMaxSequenceNr(persistenceId, toSequenceNr).delete + JournalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete } /** @@ -44,23 +41,11 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Leg baseQuery.map(_.message).update(replacement) } - private def _selectByPersistenceId(persistenceId: Rep[String]) = - JournalTable.filter(_.persistenceId === persistenceId) - - private def _selectByPersistenceIdAndMaxSequenceNr(persistenceId: Rep[String], maxSeqNr: Rep[Long]) = - _selectByPersistenceId(persistenceId).filter(_.sequenceNumber <= maxSeqNr) - val selectByPersistenceIdAndMaxSequenceNr = Compiled(_selectByPersistenceIdAndMaxSequenceNr _) - private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] = - selectAllJournalForPersistenceId(persistenceId).take(1).map(_.sequenceNumber).max + JournalTable.filter(_.persistenceId === persistenceId).take(1).map(_.sequenceNumber).max val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _) - private def _allPersistenceIdsDistinct: Query[Rep[String], String, Seq] = - JournalTable.map(_.persistenceId).distinct - - val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct) - private def _messagesQuery( persistenceId: Rep[String], fromSequenceNr: Rep[Long],