From 08f3b4d1999d4dca56f865c969675f97f554aed7 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Mon, 27 May 2024 18:38:09 +0800 Subject: [PATCH] fix oracle insert issue --- .../journal/dao/BaseJournalDaoWithReadMessages.scala | 6 +++--- .../jdbc/journal/dao/LimitWindowingStreamTest.scala | 9 +++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala index 6683e4e5..30f35697 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala @@ -51,12 +51,12 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { toSequenceNr: Long, batchSize: Int, refreshInterval: Option[(FiniteDuration, Scheduler)]) = { - val firstSequenceVer: Long = Math.max(1, fromSequenceNr) + val firstSequenceNr: Long = Math.max(1, fromSequenceNr) Source - .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((firstSequenceVer, Continue)) { + .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((firstSequenceNr, Continue)) { case (from, control) => def limitWindow(from: Long): Long = { - if (from == firstSequenceVer || batchSize <= 0 || (Long.MaxValue - batchSize) < from) { + if (from == firstSequenceNr || batchSize <= 0 || (Long.MaxValue - batchSize) < from) { toSequenceNr } else { Math.min(from + batchSize, toSequenceNr) diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala index e9e8ccdb..be2679fc 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala @@ -50,7 +50,8 @@ abstract class LimitWindowingStreamTest(configFile: String) implicit val mat: Materializer = SystemMaterializer(system).materializer val persistenceId = UUID.randomUUID().toString - val payload = 'a'.toByte + val writerUuid = UUID.randomUUID().toString + val payload = Array.fill(16)('a'.toByte) val eventsPerBatch = 1000 val numberOfInsertBatches = 16 val totalMessages = numberOfInsertBatches * eventsPerBatch @@ -58,14 +59,14 @@ abstract class LimitWindowingStreamTest(configFile: String) withDao { dao => val lastInsert = Source - .fromIterator(() => (1 to numberOfInsertBatches).toIterator) + .fromIterator(() => (1 to numberOfInsertBatches).iterator) .mapAsync(1) { i => val end = i * eventsPerBatch val start = end - (eventsPerBatch - 1) - log.info(s"batch $i (events from $start to $end") + log.info(s"batch $i - events from $start to $end") val atomicWrites = (start to end).map { j => - AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId))) + AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId, writerUuid = writerUuid))) } dao.asyncWriteMessages(atomicWrites).map(_ => i) }