Skip to content

Commit

Permalink
optimized it
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed Apr 26, 2024
1 parent a44ac5c commit fb403b3
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelaye
import pekko.stream.Materializer
import pekko.stream.scaladsl.{ Sink, Source }

import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.FiniteDuration
import scala.util.{ Failure, Success, Try }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
import org.apache.pekko.persistence.jdbc.journal.dao.LimitWindowingStreamTest.fetchSize
import org.apache.pekko.persistence.jdbc.query.{ H2Cleaner, QueryTestSpec }
import org.apache.pekko.persistence.{ AtomicWrite, PersistentRepr }
import org.apache.pekko.serialization.SerializationExtension
import org.apache.pekko.stream.scaladsl.{ Keep, Sink, Source }
import org.apache.pekko.stream.{ Materializer, SystemMaterializer }
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.slf4j.LoggerFactory

import java.util.UUID
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future }

object LimitWindowingStreamTest {
val fetchSize = 100
val configOverrides: Map[String, ConfigValue] =
Expand All @@ -44,17 +45,16 @@ abstract class LimitWindowingStreamTest(configFile: String)
private val log = LoggerFactory.getLogger(this.getClass)

it should "stream events with limit windowing" in withActorSystem { implicit system =>
implicit val ec: ExecutionContext = system.dispatcher
implicit val mat: Materializer = SystemMaterializer(system).materializer

val persistenceId = UUID.randomUUID().toString
val payload = 'a'.toByte
val eventsPerBatch = 1000
val numberOfInsertBatches = 16
val totalMessages = numberOfInsertBatches * eventsPerBatch

withDatabase { db =>
implicit val ec: ExecutionContext = system.dispatcher

val dao = new DefaultJournalDao(db, profile, journalConfig, SerializationExtension(system))

withDao { dao =>
val lastInsert =
Source
.fromIterator(() => (1 to numberOfInsertBatches).toIterator)
Expand All @@ -71,8 +71,9 @@ abstract class LimitWindowingStreamTest(configFile: String)
.runWith(Sink.last)

lastInsert.futureValue(Timeout(totalMessages.seconds))

val messagesSrc = dao.internalBatchStream(persistenceId, 0, totalMessages, batchSize = fetchSize, None)
val readMessagesDao = dao.asInstanceOf[BaseJournalDaoWithReadMessages]
val messagesSrc =
readMessagesDao.internalBatchStream(persistenceId, 0, totalMessages, batchSize = fetchSize, None)

val eventualSum: Future[(Int, Int)] = messagesSrc.toMat(Sink.fold((0, 0)) { case ((accBatch, accTotal), seq) =>
(accBatch + 1, accTotal + seq.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package org.apache.pekko.persistence.jdbc.query

import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
import org.apache.pekko.actor.ExtendedActorSystem
import org.apache.pekko.actor.{ ActorRef, ExtendedActorSystem }
import org.apache.pekko.pattern.ask
import org.apache.pekko.persistence.jdbc.config.JournalConfig
import org.apache.pekko.persistence.jdbc.journal.dao.JournalDao
import org.apache.pekko.persistence.jdbc.journal.dao.{ DefaultJournalDao, JournalDao }
import org.apache.pekko.persistence.jdbc.query.JournalDaoStreamMessagesMemoryTest.fetchSize
import org.apache.pekko.persistence.{ AtomicWrite, PersistentRepr }
import org.apache.pekko.serialization.{ Serialization, SerializationExtension }
Expand Down Expand Up @@ -55,24 +56,10 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String)
val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean

it should "stream events" in withActorSystem { implicit system =>
withDatabase { db =>
implicit val ec: ExecutionContext = system.dispatcher
implicit val mat: Materializer = SystemMaterializer(system).materializer

val fqcn = journalConfig.pluginConfig.dao
val args = Seq(
(classOf[Database], db),
(classOf[JdbcProfile], profile),
(classOf[JournalConfig], journalConfig),
(classOf[Serialization], SerializationExtension(system)),
(classOf[ExecutionContext], ec),
(classOf[Materializer], mat))
val dao: JournalDao =
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[JournalDao](fqcn, args) match {
case Success(dao) => dao
case Failure(cause) => throw cause
}
implicit val ec: ExecutionContext = system.dispatcher
implicit val mat: Materializer = SystemMaterializer(system).materializer

withDao { dao =>
val persistenceId = UUID.randomUUID().toString

val payloadSize = 5000 // 5000 bytes
Expand Down Expand Up @@ -102,7 +89,7 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String)
val atomicWrites =
(start to end).map { j =>
AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId)))
}.toSeq
}

dao.asyncWriteMessages(atomicWrites).map(_ => i)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,32 @@

package org.apache.pekko.persistence.jdbc.query

import com.typesafe.config.ConfigValue
import org.apache.pekko
import pekko.actor.{ ActorRef, ActorSystem, Props, Stash, Status }
import pekko.pattern.ask
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.JdbcProfile
import scala.concurrent.duration.{FiniteDuration, _}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import pekko.actor.{ActorRef, ActorSystem, ExtendedActorSystem, Props, Stash, Status}
import pekko.event.LoggingReceive
import pekko.persistence.{ DeleteMessagesFailure, DeleteMessagesSuccess, PersistentActor }
import pekko.pattern.ask
import pekko.persistence.jdbc.SingleActorSystemPerTestSpec
import pekko.persistence.jdbc.query.EventAdapterTest.{ Event, TaggedAsyncEvent, TaggedEvent }
import pekko.persistence.jdbc.query.javadsl.{ JdbcReadJournal => JavaJdbcReadJournal }
import pekko.persistence.jdbc.config.JournalConfig
import pekko.persistence.jdbc.journal.dao.JournalDao
import pekko.persistence.jdbc.query.EventAdapterTest.{Event, TaggedAsyncEvent, TaggedEvent}
import pekko.persistence.jdbc.query.javadsl.{JdbcReadJournal => JavaJdbcReadJournal}
import pekko.persistence.jdbc.query.scaladsl.JdbcReadJournal
import pekko.persistence.jdbc.testkit.internal._
import pekko.persistence.journal.Tagged
import pekko.persistence.query.{ EventEnvelope, Offset, PersistenceQuery }
import pekko.persistence.query.{EventEnvelope, Offset, PersistenceQuery}
import pekko.persistence.{DeleteMessagesFailure, DeleteMessagesSuccess, PersistentActor}
import pekko.serialization.{Serialization, SerializationExtension}
import pekko.stream.scaladsl.Sink
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.javadsl.{ TestSink => JavaSink }
import pekko.stream.testkit.javadsl.{TestSink => JavaSink}
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.{ Materializer, SystemMaterializer }
import com.typesafe.config.ConfigValue

import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.{ FiniteDuration, _ }
import pekko.persistence.jdbc.testkit.internal.H2
import pekko.persistence.jdbc.testkit.internal.MySQL
import pekko.persistence.jdbc.testkit.internal.Oracle
import pekko.persistence.jdbc.testkit.internal.Postgres
import pekko.persistence.jdbc.testkit.internal.SqlServer
import pekko.stream.{Materializer, SystemMaterializer}

trait ReadJournalOperations {
def withCurrentPersistenceIds(within: FiniteDuration = 60.second)(f: TestSubscriber.Probe[String] => Unit): Unit
Expand Down Expand Up @@ -337,6 +338,23 @@ abstract class QueryTestSpec(config: String, configOverrides: Map[String, Config

def withTags(payload: Any, tags: String*) = Tagged(payload, Set(tags: _*))

def withDao(f: JournalDao => Unit)(implicit system: ActorSystem, ec: ExecutionContext, mat: Materializer): Unit = {
val fqcn = journalConfig.pluginConfig.dao
val args = Seq(
(classOf[Database], db),
(classOf[JdbcProfile], profile),
(classOf[JournalConfig], journalConfig),
(classOf[Serialization], SerializationExtension(system)),
(classOf[ExecutionContext], ec),
(classOf[Materializer], mat))
val journalDao: JournalDao =
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[JournalDao](fqcn, args) match {
case Success(dao) => dao
case Failure(cause) => throw cause
}
f(journalDao)
}

}

trait PostgresCleaner extends QueryTestSpec {
Expand Down

0 comments on commit fb403b3

Please sign in to comment.