Skip to content

Commit

Permalink
force valid in limit windowing
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed May 27, 2024
1 parent ab2f6ce commit 7d4bdd4
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,16 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
toSequenceNr: Long,
batchSize: Int,
refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
val initializedQueryState: Long = Math.max(1, fromSequenceNr)
Source
.unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((Math.max(1, fromSequenceNr), Continue)) {
.unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((initializedQueryState, Continue)) {
case (from, control) =>
def limitWindow(from: Long): Long = {
math.min(from + batchSize, toSequenceNr)
if (batchSize <= 0 || (Long.MaxValue - batchSize) < from) {
toSequenceNr
} else {
Math.min(from + batchSize, toSequenceNr)
}
}

def retrieveNextBatch(): Future[Option[((Long, FlowControl), Seq[Try[(PersistentRepr, Long)]])]] = {
Expand Down

0 comments on commit 7d4bdd4

Please sign in to comment.