Skip to content

Commit

Permalink
fix oracle insert issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed May 27, 2024
1 parent 114c25d commit 08f3b4d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
toSequenceNr: Long,
batchSize: Int,
refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
val firstSequenceVer: Long = Math.max(1, fromSequenceNr)
val firstSequenceNr: Long = Math.max(1, fromSequenceNr)
Source
.unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((firstSequenceVer, Continue)) {
.unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((firstSequenceNr, Continue)) {
case (from, control) =>
def limitWindow(from: Long): Long = {
if (from == firstSequenceVer || batchSize <= 0 || (Long.MaxValue - batchSize) < from) {
if (from == firstSequenceNr || batchSize <= 0 || (Long.MaxValue - batchSize) < from) {
toSequenceNr
} else {
Math.min(from + batchSize, toSequenceNr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,23 @@ abstract class LimitWindowingStreamTest(configFile: String)
implicit val mat: Materializer = SystemMaterializer(system).materializer

val persistenceId = UUID.randomUUID().toString
val payload = 'a'.toByte
val writerUuid = UUID.randomUUID().toString
val payload = Array.fill(16)('a'.toByte)
val eventsPerBatch = 1000
val numberOfInsertBatches = 16
val totalMessages = numberOfInsertBatches * eventsPerBatch

withDao { dao =>
val lastInsert =
Source
.fromIterator(() => (1 to numberOfInsertBatches).toIterator)
.fromIterator(() => (1 to numberOfInsertBatches).iterator)
.mapAsync(1) { i =>
val end = i * eventsPerBatch
val start = end - (eventsPerBatch - 1)
log.info(s"batch $i (events from $start to $end")
log.info(s"batch $i - events from $start to $end")
val atomicWrites =
(start to end).map { j =>
AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId)))
AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId, writerUuid = writerUuid)))
}
dao.asyncWriteMessages(atomicWrites).map(_ => i)
}
Expand Down

0 comments on commit 08f3b4d

Please sign in to comment.