Skip to content

Commit

Permalink
perf: avoid large offset query via limit windowing
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed Apr 19, 2024
1 parent 333613b commit 5ceee04
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
Source
.unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((Math.max(1, fromSequenceNr), Continue)) {
case (from, control) =>
def limitWindow(from: Long): Long = {
math.min(from + batchSize, toSequenceNr)
}

def retrieveNextBatch(): Future[Option[((Long, FlowControl), Seq[Try[(PersistentRepr, Long)]])]] = {
for {
xs <- messages(persistenceId, from, toSequenceNr, batchSize).runWith(Sink.seq)
xs <- messages(persistenceId, from, limitWindow(from), batchSize).runWith(Sink.seq)
} yield {
val hasMoreEvents = xs.size == batchSize
// Events are ordered by sequence number, therefore the last one is the largest)
Expand Down Expand Up @@ -77,7 +81,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
pekko.pattern.after(delay, scheduler)(retrieveNextBatch())
}
}
.mapConcat(identity(_))
.mapConcat(identity)
}

}

0 comments on commit 5ceee04

Please sign in to comment.