Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: avoid large offset query via limit windowing #180

Merged
merged 11 commits into from
May 27, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
package org.apache.pekko.persistence.jdbc.journal.dao

import org.apache.pekko
import org.apache.pekko.annotation.InternalApi
import pekko.NotUsed
import pekko.actor.Scheduler
import pekko.annotation.InternalApi
import pekko.persistence.PersistentRepr
import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelayed, Stop }
import pekko.stream.Materializer
import pekko.stream.scaladsl.{ Sink, Source }

import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }

trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
package org.apache.pekko.persistence.jdbc.journal.dao

import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
import org.apache.pekko.persistence.jdbc.journal.dao.LimitWindowingStreamTest.fetchSize
import org.apache.pekko.persistence.jdbc.query.{ H2Cleaner, QueryTestSpec }
import org.apache.pekko.persistence.{ AtomicWrite, PersistentRepr }
import org.apache.pekko.stream.scaladsl.{ Keep, Sink, Source }
import org.apache.pekko.stream.{ Materializer, SystemMaterializer }
import org.apache.pekko
Roiocam marked this conversation as resolved.
Show resolved Hide resolved
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.slf4j.LoggerFactory
import pekko.persistence.jdbc.journal.dao.LimitWindowingStreamTest.fetchSize
import pekko.persistence.jdbc.query.{ H2Cleaner, QueryTestSpec }
import pekko.persistence.{ AtomicWrite, PersistentRepr }
import pekko.stream.scaladsl.{ Keep, Sink, Source }
import pekko.stream.{ Materializer, SystemMaterializer }

import java.util.UUID
import scala.collection.immutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
package org.apache.pekko.persistence.jdbc.query

import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
import org.apache.pekko.persistence.jdbc.query.JournalDaoStreamMessagesMemoryTest.fetchSize
import org.apache.pekko.persistence.{ AtomicWrite, PersistentRepr }
import org.apache.pekko.stream.scaladsl.{ Sink, Source }
import org.apache.pekko.stream.testkit.scaladsl.TestSink
import org.apache.pekko.stream.{ Materializer, SystemMaterializer }
import org.apache.pekko
Roiocam marked this conversation as resolved.
Show resolved Hide resolved
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.slf4j.LoggerFactory
import pekko.persistence.jdbc.query.JournalDaoStreamMessagesMemoryTest.fetchSize
import pekko.persistence.{ AtomicWrite, PersistentRepr }
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.{ Materializer, SystemMaterializer }

import java.lang.management.{ ManagementFactory, MemoryMXBean }
import java.util.UUID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ package org.apache.pekko.persistence.jdbc.query

import com.typesafe.config.ConfigValue
import org.apache.pekko
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.JdbcProfile

import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
import pekko.actor.{ ActorRef, ActorSystem, ExtendedActorSystem, Props, Stash, Status }
import pekko.event.LoggingReceive
import pekko.pattern.ask
Expand All @@ -41,8 +35,13 @@ import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.javadsl.{ TestSink => JavaSink }
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.{ Materializer, SystemMaterializer }
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.JdbcProfile

import scala.collection.immutable
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success }

trait ReadJournalOperations {
def withCurrentPersistenceIds(within: FiniteDuration = 60.second)(f: TestSubscriber.Probe[String] => Unit): Unit
Expand Down
Loading