diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala index dee3ba67..629ac0c4 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala @@ -17,14 +17,14 @@ package org.apache.pekko.persistence.jdbc.journal.dao import org.apache.pekko import pekko.NotUsed import pekko.actor.Scheduler +import pekko.annotation.InternalApi import pekko.persistence.PersistentRepr import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelayed, Stop } import pekko.stream.Materializer import pekko.stream.scaladsl.{ Sink, Source } -import scala.collection.immutable.Seq -import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { @@ -38,13 +38,29 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { toSequenceNr: Long, batchSize: Int, refreshInterval: Option[(FiniteDuration, Scheduler)]): Source[Try[(PersistentRepr, Long)], NotUsed] = { + internalBatchStream(persistenceId, fromSequenceNr, toSequenceNr, batchSize, refreshInterval).mapConcat(identity) + } + /** + * separate this method for unit tests. + */ + @InternalApi + private[dao] def internalBatchStream( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long, + batchSize: Int, + refreshInterval: Option[(FiniteDuration, Scheduler)]) = { Source .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((Math.max(1, fromSequenceNr), Continue)) { case (from, control) => + def limitWindow(from: Long): Long = { + math.min(from + batchSize, toSequenceNr) + } + def retrieveNextBatch(): Future[Option[((Long, FlowControl), Seq[Try[(PersistentRepr, Long)]])]] = { for { - xs <- messages(persistenceId, from, toSequenceNr, batchSize).runWith(Sink.seq) + xs <- messages(persistenceId, from, limitWindow(from), batchSize).runWith(Sink.seq) } yield { val hasMoreEvents = xs.size == batchSize // Events are ordered by sequence number, therefore the last one is the largest) @@ -77,7 +93,6 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { pekko.pattern.after(delay, scheduler)(retrieveNextBatch()) } } - .mapConcat(identity(_)) } } diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala new file mode 100644 index 00000000..e9e8ccdb --- /dev/null +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pekko.persistence.jdbc.journal.dao + +import org.apache.pekko +import pekko.persistence.jdbc.journal.dao.LimitWindowingStreamTest.fetchSize +import pekko.persistence.jdbc.query.{ H2Cleaner, QueryTestSpec } +import pekko.persistence.{ AtomicWrite, PersistentRepr } +import pekko.stream.scaladsl.{ Keep, Sink, Source } +import pekko.stream.{ Materializer, SystemMaterializer } +import com.typesafe.config.{ ConfigValue, ConfigValueFactory } +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.slf4j.LoggerFactory + +import java.util.UUID +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.{ Await, ExecutionContext, Future } + +object LimitWindowingStreamTest { + val fetchSize = 100 + val configOverrides: Map[String, ConfigValue] = + Map("jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef(fetchSize)) +} + +abstract class LimitWindowingStreamTest(configFile: String) + extends QueryTestSpec(configFile, LimitWindowingStreamTest.configOverrides) { + + private val log = LoggerFactory.getLogger(this.getClass) + + it should "stream events with limit windowing" in withActorSystem { implicit system => + implicit val ec: ExecutionContext = system.dispatcher + implicit val mat: Materializer = SystemMaterializer(system).materializer + + val persistenceId = UUID.randomUUID().toString + val payload = 'a'.toByte + val eventsPerBatch = 1000 + val numberOfInsertBatches = 16 + val totalMessages = numberOfInsertBatches * eventsPerBatch + + withDao { dao => + val lastInsert = + Source + .fromIterator(() => (1 to numberOfInsertBatches).toIterator) + .mapAsync(1) { i => + val end = i * eventsPerBatch + val start = end - (eventsPerBatch - 1) + log.info(s"batch $i (events from $start to $end") + val atomicWrites = + (start to end).map { j => + AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId))) + } + dao.asyncWriteMessages(atomicWrites).map(_ => i) + } + .runWith(Sink.last) + + lastInsert.futureValue(Timeout(totalMessages.seconds)) + val readMessagesDao = dao.asInstanceOf[BaseJournalDaoWithReadMessages] + val messagesSrc = + readMessagesDao.internalBatchStream(persistenceId, 0, totalMessages, batchSize = fetchSize, None) + + val eventualSum: Future[(Int, Int)] = messagesSrc.toMat(Sink.fold((0, 0)) { case ((accBatch, accTotal), seq) => + (accBatch + 1, accTotal + seq.size) + })(Keep.right).run() + + val (batchCount, totalCount) = Await.result(eventualSum, Duration.Inf) + val totalBatch = totalMessages / fetchSize + batchCount shouldBe totalBatch + totalCount shouldBe totalMessages + } + } +} + +class H2LimitWindowingStreamTest extends LimitWindowingStreamTest("h2-application.conf") with H2Cleaner diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala index 02625e70..216d7b83 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala @@ -14,16 +14,15 @@ package org.apache.pekko.persistence.jdbc.query -import java.lang.management.ManagementFactory -import java.lang.management.MemoryMXBean +import java.lang.management.{ ManagementFactory, MemoryMXBean } import java.util.UUID import org.apache.pekko -import pekko.actor.ActorSystem +import pekko.persistence.jdbc.query.JournalDaoStreamMessagesMemoryTest.fetchSize import pekko.persistence.{ AtomicWrite, PersistentRepr } -import pekko.persistence.jdbc.journal.dao.legacy.{ ByteArrayJournalDao, JournalTables } -import pekko.serialization.SerializationExtension import pekko.stream.scaladsl.{ Sink, Source } +import pekko.stream.testkit.scaladsl.TestSink +import pekko.stream.{ Materializer, SystemMaterializer } import com.typesafe.config.{ ConfigValue, ConfigValueFactory } import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.slf4j.LoggerFactory @@ -32,120 +31,110 @@ import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.util.{ Failure, Success } -import pekko.stream.testkit.scaladsl.TestSink -import org.scalatest.matchers.should.Matchers object JournalDaoStreamMessagesMemoryTest { - val configOverrides: Map[String, ConfigValue] = Map("jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100")) + val fetchSize: Int = 100 + val MB: Int = 1024 * 1024 - val MB = 1024 * 1024 + val configOverrides: Map[String, ConfigValue] = Map( + "jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100")) } abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) - extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides) - with JournalTables - with Matchers { + extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides) { + import JournalDaoStreamMessagesMemoryTest.MB private val log = LoggerFactory.getLogger(this.getClass) - val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration - val journalTableCfg = journalConfig.journalTableConfiguration + val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean - implicit val askTimeout: FiniteDuration = 50.millis + it should "stream events" in withActorSystem { implicit system => + implicit val ec: ExecutionContext = system.dispatcher + implicit val mat: Materializer = SystemMaterializer(system).materializer - def generateId: Int = 0 + withDao { dao => + val persistenceId = UUID.randomUUID().toString - val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean + val writerUuid = UUID.randomUUID().toString + + val payloadSize = 5000 // 5000 bytes + val eventsPerBatch = 1000 - behavior.of("Replaying Persistence Actor") - - it should "stream events" in { - if (newDao) - pending - withActorSystem { implicit system: ActorSystem => - withDatabase { db => - implicit val ec: ExecutionContext = system.dispatcher - - val persistenceId = UUID.randomUUID().toString - val dao = new ByteArrayJournalDao(db, profile, journalConfig, SerializationExtension(system)) - - val payloadSize = 5000 // 5000 bytes - val eventsPerBatch = 1000 - - val maxMem = 64 * MB - - val numberOfInsertBatches = { - // calculate the number of batches using a factor to make sure we go a little bit over the limit - (maxMem / (payloadSize * eventsPerBatch) * 1.2).round.toInt - } - val totalMessages = numberOfInsertBatches * eventsPerBatch - val totalMessagePayload = totalMessages * payloadSize - log.info( - s"batches: $numberOfInsertBatches (with $eventsPerBatch events), total messages: $totalMessages, total msgs size: $totalMessagePayload") - - // payload can be the same when inserting to avoid unnecessary memory usage - val payload = Array.fill(payloadSize)('a'.toByte) - - val lastInsert = - Source - .fromIterator(() => (1 to numberOfInsertBatches).toIterator) - .mapAsync(1) { i => - val end = i * eventsPerBatch - val start = end - (eventsPerBatch - 1) - log.info(s"batch $i - events from $start to $end") - val atomicWrites = - (start to end).map { j => - AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId))) - }.toSeq - - dao.asyncWriteMessages(atomicWrites).map(_ => i) - } - .runWith(Sink.last) - - // wait until we write all messages - // being very generous, 1 second per message - lastInsert.futureValue(Timeout(totalMessages.seconds)) - - log.info("Events written, starting replay") - - // sleep and gc to have some kind of stable measurement of current heap usage - Thread.sleep(1000) - System.gc() - Thread.sleep(1000) - val usedBefore = memoryMBean.getHeapMemoryUsage.getUsed - - val messagesSrc = - dao.messagesWithBatch(persistenceId, 0, totalMessages, batchSize = 100, None) - val probe = - messagesSrc - .map { - case Success((repr, _)) => - if (repr.sequenceNr % 100 == 0) - log.info(s"fetched: ${repr.persistenceId} - ${repr.sequenceNr}/$totalMessages") - case Failure(exception) => - log.error("Failure when reading messages.", exception) - } - .runWith(TestSink.probe) - - probe.request(10) - probe.within(20.seconds) { - probe.expectNextN(10) - } - - // sleep and gc to have some kind of stable measurement of current heap usage - Thread.sleep(2000) - System.gc() - Thread.sleep(1000) - val usedAfter = memoryMBean.getHeapMemoryUsage.getUsed - - log.info(s"Used heap before ${usedBefore / MB} MB, after ${usedAfter / MB} MB") - // actual usage is much less than 10 MB - (usedAfter - usedBefore) should be <= (10L * MB) - - probe.cancel() + val maxMem = 64 * MB + + val numberOfInsertBatches = { + // calculate the number of batches using a factor to make sure we go a little bit over the limit + (maxMem / (payloadSize * eventsPerBatch) * 1.2).round.toInt + } + val totalMessages = numberOfInsertBatches * eventsPerBatch + val totalMessagePayload = totalMessages * payloadSize + log.info( + s"batches: $numberOfInsertBatches (with $eventsPerBatch events), total messages: $totalMessages, total msgs size: $totalMessagePayload") + + // payload can be the same when inserting to avoid unnecessary memory usage + val payload = Array.fill(payloadSize)('a'.toByte) + + val lastInsert = + Source + .fromIterator(() => (1 to numberOfInsertBatches).iterator) + .mapAsync(1) { i => + val end = i * eventsPerBatch + val start = end - (eventsPerBatch - 1) + log.info(s"batch $i - events from $start to $end") + val atomicWrites = + (start to end).map { j => + AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId, writerUuid = writerUuid))) + } + dao.asyncWriteMessages(atomicWrites).map(_ => i) + } + .runWith(Sink.last) + + // wait until we write all messages + // being very generous, 1 second per message + lastInsert.futureValue(Timeout(totalMessages.seconds)) + + log.info("Events written, starting replay") + + // sleep and gc to have some kind of stable measurement of current heap usage + Thread.sleep(1000) + System.gc() + Thread.sleep(1000) + val usedBefore = memoryMBean.getHeapMemoryUsage.getUsed + + val messagesSrc = + dao.messagesWithBatch(persistenceId, 0, totalMessages, batchSize = fetchSize, None) + val probe = + messagesSrc + .map { + case Success((repr, _)) => + if (repr.sequenceNr % 100 == 0) + log.info(s"fetched: ${repr.persistenceId} - ${repr.sequenceNr}/$totalMessages") + case Failure(exception) => + log.error("Failure when reading messages.", exception) + } + .runWith(TestSink.probe) + + probe.request(10) + probe.within(20.seconds) { + probe.expectNextN(10) } + + // sleep and gc to have some kind of stable measurement of current heap usage + Thread.sleep(2000) + System.gc() + Thread.sleep(1000) + val usedAfter = memoryMBean.getHeapMemoryUsage.getUsed + + log.info(s"Used heap before ${usedBefore / MB} MB, after ${usedAfter / MB} MB") + // actual usage is much less than 10 MB + (usedAfter - usedBefore) should be <= (10L * MB) + + probe.cancel() } } } + +class H2JournalDaoStreamMessagesMemoryTest extends JournalDaoStreamMessagesMemoryTest("h2-application.conf") + with H2Cleaner diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala index 9c4290ce..c299fbe9 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala @@ -15,30 +15,33 @@ package org.apache.pekko.persistence.jdbc.query import org.apache.pekko -import pekko.actor.{ ActorRef, ActorSystem, Props, Stash, Status } -import pekko.pattern.ask +import pekko.actor.{ ActorRef, ActorSystem, ExtendedActorSystem, Props, Stash, Status } import pekko.event.LoggingReceive -import pekko.persistence.{ DeleteMessagesFailure, DeleteMessagesSuccess, PersistentActor } +import pekko.pattern.ask import pekko.persistence.jdbc.SingleActorSystemPerTestSpec +import pekko.persistence.jdbc.config.JournalConfig +import pekko.persistence.jdbc.journal.dao.JournalDao import pekko.persistence.jdbc.query.EventAdapterTest.{ Event, TaggedAsyncEvent, TaggedEvent } import pekko.persistence.jdbc.query.javadsl.{ JdbcReadJournal => JavaJdbcReadJournal } import pekko.persistence.jdbc.query.scaladsl.JdbcReadJournal +import pekko.persistence.jdbc.testkit.internal._ import pekko.persistence.journal.Tagged import pekko.persistence.query.{ EventEnvelope, Offset, PersistenceQuery } +import pekko.persistence.{ DeleteMessagesFailure, DeleteMessagesSuccess, PersistentActor } +import pekko.serialization.{ Serialization, SerializationExtension } import pekko.stream.scaladsl.Sink import pekko.stream.testkit.TestSubscriber import pekko.stream.testkit.javadsl.{ TestSink => JavaSink } import pekko.stream.testkit.scaladsl.TestSink import pekko.stream.{ Materializer, SystemMaterializer } import com.typesafe.config.ConfigValue +import slick.jdbc.JdbcBackend.Database +import slick.jdbc.JdbcProfile -import scala.concurrent.{ ExecutionContext, Future } +import scala.collection.immutable import scala.concurrent.duration.{ FiniteDuration, _ } -import pekko.persistence.jdbc.testkit.internal.H2 -import pekko.persistence.jdbc.testkit.internal.MySQL -import pekko.persistence.jdbc.testkit.internal.Oracle -import pekko.persistence.jdbc.testkit.internal.Postgres -import pekko.persistence.jdbc.testkit.internal.SqlServer +import scala.concurrent.{ ExecutionContext, Future } +import scala.util.{ Failure, Success } trait ReadJournalOperations { def withCurrentPersistenceIds(within: FiniteDuration = 60.second)(f: TestSubscriber.Probe[String] => Unit): Unit @@ -337,6 +340,23 @@ abstract class QueryTestSpec(config: String, configOverrides: Map[String, Config def withTags(payload: Any, tags: String*) = Tagged(payload, Set(tags: _*)) + def withDao(f: JournalDao => Unit)(implicit system: ActorSystem, ec: ExecutionContext, mat: Materializer): Unit = { + val fqcn: String = journalConfig.pluginConfig.dao + val args: immutable.Seq[(Class[_], AnyRef)] = immutable.Seq( + (classOf[Database], db), + (classOf[JdbcProfile], profile), + (classOf[JournalConfig], journalConfig), + (classOf[Serialization], SerializationExtension(system)), + (classOf[ExecutionContext], ec), + (classOf[Materializer], mat)) + val journalDao: JournalDao = + system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[JournalDao](fqcn, args) match { + case Success(dao) => dao + case Failure(cause) => throw cause + } + f(journalDao) + } + } trait PostgresCleaner extends QueryTestSpec {