diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala index 15916c7f..a14b6d71 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala @@ -23,7 +23,6 @@ import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelaye import pekko.stream.Materializer import pekko.stream.scaladsl.{ Sink, Source } -import scala.collection.immutable.Seq import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.duration.FiniteDuration import scala.util.{ Failure, Success, Try } diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala index 9cb41688..5139b82e 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala @@ -23,8 +23,8 @@ import com.typesafe.config.{ ConfigValue, ConfigValueFactory } import org.apache.pekko.persistence.jdbc.journal.dao.LimitWindowingStreamTest.fetchSize import org.apache.pekko.persistence.jdbc.query.{ H2Cleaner, QueryTestSpec } import org.apache.pekko.persistence.{ AtomicWrite, PersistentRepr } -import org.apache.pekko.serialization.SerializationExtension import org.apache.pekko.stream.scaladsl.{ Keep, Sink, Source } +import org.apache.pekko.stream.{ Materializer, SystemMaterializer } import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.slf4j.LoggerFactory @@ -32,6 +32,7 @@ import java.util.UUID import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.{ Await, ExecutionContext, Future } + object LimitWindowingStreamTest { val fetchSize = 100 val configOverrides: Map[String, ConfigValue] = @@ -44,17 +45,16 @@ abstract class LimitWindowingStreamTest(configFile: String) private val log = LoggerFactory.getLogger(this.getClass) it should "stream events with limit windowing" in withActorSystem { implicit system => + implicit val ec: ExecutionContext = system.dispatcher + implicit val mat: Materializer = SystemMaterializer(system).materializer + val persistenceId = UUID.randomUUID().toString val payload = 'a'.toByte val eventsPerBatch = 1000 val numberOfInsertBatches = 16 val totalMessages = numberOfInsertBatches * eventsPerBatch - withDatabase { db => - implicit val ec: ExecutionContext = system.dispatcher - - val dao = new DefaultJournalDao(db, profile, journalConfig, SerializationExtension(system)) - + withDao { dao => val lastInsert = Source .fromIterator(() => (1 to numberOfInsertBatches).toIterator) @@ -71,8 +71,9 @@ abstract class LimitWindowingStreamTest(configFile: String) .runWith(Sink.last) lastInsert.futureValue(Timeout(totalMessages.seconds)) - - val messagesSrc = dao.internalBatchStream(persistenceId, 0, totalMessages, batchSize = fetchSize, None) + val readMessagesDao = dao.asInstanceOf[BaseJournalDaoWithReadMessages] + val messagesSrc = + readMessagesDao.internalBatchStream(persistenceId, 0, totalMessages, batchSize = fetchSize, None) val eventualSum: Future[(Int, Int)] = messagesSrc.toMat(Sink.fold((0, 0)) { case ((accBatch, accTotal), seq) => (accBatch + 1, accTotal + seq.size) diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala index dca2878f..ccb8c4c1 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala @@ -15,9 +15,10 @@ package org.apache.pekko.persistence.jdbc.query import com.typesafe.config.{ ConfigValue, ConfigValueFactory } -import org.apache.pekko.actor.ExtendedActorSystem +import org.apache.pekko.actor.{ ActorRef, ExtendedActorSystem } +import org.apache.pekko.pattern.ask import org.apache.pekko.persistence.jdbc.config.JournalConfig -import org.apache.pekko.persistence.jdbc.journal.dao.JournalDao +import org.apache.pekko.persistence.jdbc.journal.dao.{ DefaultJournalDao, JournalDao } import org.apache.pekko.persistence.jdbc.query.JournalDaoStreamMessagesMemoryTest.fetchSize import org.apache.pekko.persistence.{ AtomicWrite, PersistentRepr } import org.apache.pekko.serialization.{ Serialization, SerializationExtension } @@ -55,24 +56,10 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean it should "stream events" in withActorSystem { implicit system => - withDatabase { db => - implicit val ec: ExecutionContext = system.dispatcher - implicit val mat: Materializer = SystemMaterializer(system).materializer - - val fqcn = journalConfig.pluginConfig.dao - val args = Seq( - (classOf[Database], db), - (classOf[JdbcProfile], profile), - (classOf[JournalConfig], journalConfig), - (classOf[Serialization], SerializationExtension(system)), - (classOf[ExecutionContext], ec), - (classOf[Materializer], mat)) - val dao: JournalDao = - system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[JournalDao](fqcn, args) match { - case Success(dao) => dao - case Failure(cause) => throw cause - } + implicit val ec: ExecutionContext = system.dispatcher + implicit val mat: Materializer = SystemMaterializer(system).materializer + withDao { dao => val persistenceId = UUID.randomUUID().toString val payloadSize = 5000 // 5000 bytes @@ -102,7 +89,7 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) val atomicWrites = (start to end).map { j => AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId))) - }.toSeq + } dao.asyncWriteMessages(atomicWrites).map(_ => i) } diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala index 9c4290ce..78f74ffb 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala @@ -14,31 +14,32 @@ package org.apache.pekko.persistence.jdbc.query +import com.typesafe.config.ConfigValue import org.apache.pekko -import pekko.actor.{ ActorRef, ActorSystem, Props, Stash, Status } -import pekko.pattern.ask +import slick.jdbc.JdbcBackend.Database +import slick.jdbc.JdbcProfile +import scala.concurrent.duration.{FiniteDuration, _} +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} +import pekko.actor.{ActorRef, ActorSystem, ExtendedActorSystem, Props, Stash, Status} import pekko.event.LoggingReceive -import pekko.persistence.{ DeleteMessagesFailure, DeleteMessagesSuccess, PersistentActor } +import pekko.pattern.ask import pekko.persistence.jdbc.SingleActorSystemPerTestSpec -import pekko.persistence.jdbc.query.EventAdapterTest.{ Event, TaggedAsyncEvent, TaggedEvent } -import pekko.persistence.jdbc.query.javadsl.{ JdbcReadJournal => JavaJdbcReadJournal } +import pekko.persistence.jdbc.config.JournalConfig +import pekko.persistence.jdbc.journal.dao.JournalDao +import pekko.persistence.jdbc.query.EventAdapterTest.{Event, TaggedAsyncEvent, TaggedEvent} +import pekko.persistence.jdbc.query.javadsl.{JdbcReadJournal => JavaJdbcReadJournal} import pekko.persistence.jdbc.query.scaladsl.JdbcReadJournal +import pekko.persistence.jdbc.testkit.internal._ import pekko.persistence.journal.Tagged -import pekko.persistence.query.{ EventEnvelope, Offset, PersistenceQuery } +import pekko.persistence.query.{EventEnvelope, Offset, PersistenceQuery} +import pekko.persistence.{DeleteMessagesFailure, DeleteMessagesSuccess, PersistentActor} +import pekko.serialization.{Serialization, SerializationExtension} import pekko.stream.scaladsl.Sink import pekko.stream.testkit.TestSubscriber -import pekko.stream.testkit.javadsl.{ TestSink => JavaSink } +import pekko.stream.testkit.javadsl.{TestSink => JavaSink} import pekko.stream.testkit.scaladsl.TestSink -import pekko.stream.{ Materializer, SystemMaterializer } -import com.typesafe.config.ConfigValue - -import scala.concurrent.{ ExecutionContext, Future } -import scala.concurrent.duration.{ FiniteDuration, _ } -import pekko.persistence.jdbc.testkit.internal.H2 -import pekko.persistence.jdbc.testkit.internal.MySQL -import pekko.persistence.jdbc.testkit.internal.Oracle -import pekko.persistence.jdbc.testkit.internal.Postgres -import pekko.persistence.jdbc.testkit.internal.SqlServer +import pekko.stream.{Materializer, SystemMaterializer} trait ReadJournalOperations { def withCurrentPersistenceIds(within: FiniteDuration = 60.second)(f: TestSubscriber.Probe[String] => Unit): Unit @@ -337,6 +338,23 @@ abstract class QueryTestSpec(config: String, configOverrides: Map[String, Config def withTags(payload: Any, tags: String*) = Tagged(payload, Set(tags: _*)) + def withDao(f: JournalDao => Unit)(implicit system: ActorSystem, ec: ExecutionContext, mat: Materializer): Unit = { + val fqcn = journalConfig.pluginConfig.dao + val args = Seq( + (classOf[Database], db), + (classOf[JdbcProfile], profile), + (classOf[JournalConfig], journalConfig), + (classOf[Serialization], SerializationExtension(system)), + (classOf[ExecutionContext], ec), + (classOf[Materializer], mat)) + val journalDao: JournalDao = + system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[JournalDao](fqcn, args) match { + case Success(dao) => dao + case Failure(cause) => throw cause + } + f(journalDao) + } + } trait PostgresCleaner extends QueryTestSpec {