From 787967330573f73756d7d8992082c447134eca3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mareks=20Ramp=C4=81ns?= <8796159+mr-git@users.noreply.github.com> Date: Tue, 5 Nov 2024 11:22:53 +0200 Subject: [PATCH 1/2] add documentation for classes, which are involved in replication --- .../kafka/journal/Journal.scala | 7 +- .../kafka/journal/replicator/Batch.scala | 4 + .../journal/replicator/ReplicateRecords.scala | 4 + .../kafka/journal/replicator/Replicator.scala | 7 + .../journal/replicator/TopicReplicator.scala | 7 + .../kafka/journal/replicator/BatchSpec.scala | 130 +++++++++--------- 6 files changed, 93 insertions(+), 66 deletions(-) diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/Journal.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/Journal.scala index 06595e947..fa150950a 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/Journal.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/Journal.scala @@ -31,11 +31,16 @@ trait Journal[F[_]] { /** * Deletes events up to provided SeqNr + * Using default value `DeleteTo.max` is not recommended: `Delete` action is valid for starting new journal and + * the `DeleteTo.max` would produce dead-en journal - there will be no way to add new events. */ + // TODO next major release - remove default value def delete(to: DeleteTo = DeleteTo.max): F[Option[PartitionOffset]] /** - * Deletes all data with regards to key, consecutive pointer call will return none + * Deletes all data (resets storage, removes all events and all traces of this journal) for the key, + * consecutive pointer call will return `None`. + * Mostly used by [[PurgeExpired]]. */ def purge: F[Option[PartitionOffset]] } diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala index fe418d58f..acd942b71 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala @@ -5,6 +5,10 @@ import cats.syntax.all.* import com.evolutiongaming.kafka.journal.* import com.evolutiongaming.skafka.Offset +/** + * Receives records from [[ReplicateRecords]], groups and optimizes sequential actions. + * Pays extra attention to preserve `origin` and `version` of the first `delete` action when several are merged. + */ private[journal] sealed abstract class Batch extends Product { def offset: Offset diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala index c4a5d5063..bbb90262c 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala @@ -14,6 +14,10 @@ import com.evolutiongaming.skafka.Offset import java.time.Instant import scala.concurrent.duration.FiniteDuration +/** + * Gets a list of per-key records from [[TopicReplicator]], groups them in [[Batch]]es and replicates each batch + * to Cassandra using [[ReplicatedKeyJournal]]. + */ private[journal] trait ReplicateRecords[F[_]] { def apply(records: Nel[ConsRecord], timestamp: Instant): F[Int] diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Replicator.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Replicator.scala index eed3fbfe7..daab82d55 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Replicator.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Replicator.scala @@ -25,6 +25,13 @@ import scodec.bits.ByteVector import scala.concurrent.duration.* +/** + * Subscribes to Kafka and establishes session with Cassandra. + * For each topic creates instance of [[TopicReplicator]] and binds it with Cassandra client [[ReplicatedCassandra]] + * (which implements [[ReplicatedJournal]]). + * + * [[TopicReplicator]] groups messages per key and delegates them to [[ReplicateRecords]] for storage in Cassandra. + */ // TODO TEST trait Replicator[F[_]] { diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala index aa0b6d3bc..07e9a0871 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala @@ -21,6 +21,13 @@ import java.time.Instant import scala.concurrent.duration.* import scala.util.Try +/** + * Consumes the Kafka topic and "splits" the data stream into [[PartitionFlow]]s + * and "splits" each per-partition stream in [[KeyFlow]]s. + * Basically: + * - result of each Kafka's `poll` gets grouped per partition and key + * - grouped per-key records are processed by [[ReplicateRecords]] + */ private[journal] object TopicReplicator { def make[F[_]: Concurrent: Sleep: ToTry: LogOf: Fail: MeasureDuration: JsonCodec]( diff --git a/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/BatchSpec.scala b/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/BatchSpec.scala index a0e2b8deb..a56188e61 100644 --- a/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/BatchSpec.scala +++ b/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/BatchSpec.scala @@ -43,113 +43,113 @@ class BatchSpec extends AnyFunSuite with Matchers { ), List(appends(4, append(offset = 1, seqNr = 1), append(offset = 3, seqNr = 2))), ), - (Nel.of(delete(offset = 1, seqNr = 1)), List(deletes(offset = 1, seqNr = 1))), - (Nel.of(mark(offset = 1), delete(offset = 2, seqNr = 1)), List(deletes(offset = 2, seqNr = 1))), - (Nel.of(delete(offset = 1, seqNr = 1), mark(offset = 2)), List(deletes(offset = 2, seqNr = 1))), + (Nel.of(delete(offset = 1, to = 1)), List(deletes(offset = 1, to = 1))), + (Nel.of(mark(offset = 1), delete(offset = 2, to = 1)), List(deletes(offset = 2, to = 1))), + (Nel.of(delete(offset = 1, to = 1), mark(offset = 2)), List(deletes(offset = 2, to = 1))), ( - Nel.of(delete(offset = 1, seqNr = 1), append(offset = 2, seqNr = 2)), - List(deletes(offset = 1, seqNr = 1), appends(2, append(offset = 2, seqNr = 2))), + Nel.of(delete(offset = 1, to = 1), append(offset = 2, seqNr = 2)), + List(deletes(offset = 1, to = 1), appends(2, append(offset = 2, seqNr = 2))), ), ( - Nel.of(append(offset = 1, seqNr = 2), delete(offset = 2, seqNr = 1)), - List(appends(1, append(offset = 1, seqNr = 2)), deletes(offset = 2, seqNr = 1)), + Nel.of(append(offset = 1, seqNr = 2), delete(offset = 2, to = 1)), + List(appends(1, append(offset = 1, seqNr = 2)), deletes(offset = 2, to = 1)), ), ( - Nel.of(append(offset = 1, seqNr = 1, seqNrs = 2, 3), delete(offset = 2, seqNr = 1)), - List(appends(1, append(offset = 1, seqNr = 1, seqNrs = 2, 3)), deletes(offset = 2, seqNr = 1)), + Nel.of(append(offset = 1, seqNr = 1, seqNrs = 2, 3), delete(offset = 2, to = 1)), + List(appends(1, append(offset = 1, seqNr = 1, seqNrs = 2, 3)), deletes(offset = 2, to = 1)), ), ( - Nel.of(append(offset = 1, seqNr = 1), delete(offset = 2, seqNr = 1), append(offset = 3, seqNr = 2)), - List(appends(1, append(offset = 1, seqNr = 1)), deletes(offset = 2, seqNr = 1), appends(3, append(offset = 3, seqNr = 2))), + Nel.of(append(offset = 1, seqNr = 1), delete(offset = 2, to = 1), append(offset = 3, seqNr = 2)), + List(appends(1, append(offset = 1, seqNr = 1)), deletes(offset = 2, to = 1), appends(3, append(offset = 3, seqNr = 2))), ), ( Nel.of( append(offset = 1, seqNr = 1), - delete(offset = 2, seqNr = 1, origin = "origin1"), + delete(offset = 2, to = 1, origin = "origin1"), append(offset = 3, seqNr = 2), - delete(offset = 4, seqNr = 2, origin = "origin2"), + delete(offset = 4, to = 2, origin = "origin2"), ), - List(appends(3, append(offset = 3, seqNr = 2)), deletes(offset = 4, seqNr = 2, origin = "origin1")), + List(appends(3, append(offset = 3, seqNr = 2)), deletes(offset = 4, to = 2, origin = "origin1")), ), ( Nel.of( append(offset = 1, seqNr = 1), - delete(offset = 2, seqNr = 1, origin = "origin"), + delete(offset = 2, to = 1, origin = "origin"), append(offset = 3, seqNr = 2), - delete(offset = 4, seqNr = 2), + delete(offset = 4, to = 2), ), - List(appends(3, append(offset = 3, seqNr = 2)), deletes(offset = 4, seqNr = 2, origin = "origin")), + List(appends(3, append(offset = 3, seqNr = 2)), deletes(offset = 4, to = 2, origin = "origin")), ), ( Nel.of( append(offset = 1, seqNr = 1), append(offset = 2, seqNr = 2), - delete(offset = 3, seqNr = 1, origin = "origin1"), - delete(offset = 4, seqNr = 2, origin = "origin2"), + delete(offset = 3, to = 1, origin = "origin1"), + delete(offset = 4, to = 2, origin = "origin2"), ), - List(appends(2, append(offset = 2, seqNr = 2)), deletes(offset = 4, seqNr = 2, origin = "origin1")), + List(appends(2, append(offset = 2, seqNr = 2)), deletes(offset = 4, to = 2, origin = "origin1")), ), ( Nel.of( append(offset = 1, seqNr = 1), append(offset = 2, seqNr = 2), - delete(offset = 3, seqNr = 1), - delete(offset = 4, seqNr = 2, origin = "origin"), + delete(offset = 3, to = 1), + delete(offset = 4, to = 2, origin = "origin"), ), List( appends(offset = 2, append(offset = 2, seqNr = 2)), - deletes(offset = 4, seqNr = 2, origin = "origin"), + deletes(offset = 4, to = 2, origin = "origin"), ), ), - (Nel.of(delete(offset = 2, seqNr = 1), delete(offset = 3, seqNr = 2)), List(deletes(offset = 3, seqNr = 2))), + (Nel.of(delete(offset = 2, to = 1), delete(offset = 3, to = 2)), List(deletes(offset = 3, to = 2))), ( - Nel.of(delete(offset = 2, seqNr = 2, origin = "origin"), delete(offset = 3, seqNr = 1)), - List(deletes(offset = 3, seqNr = 2, origin = "origin")), + Nel.of(delete(offset = 2, to = 2, origin = "origin"), delete(offset = 3, to = 1)), + List(deletes(offset = 3, to = 2, origin = "origin")), ), ( Nel.of( mark(offset = 2), - delete(offset = 3, seqNr = 1, origin = "origin"), + delete(offset = 3, to = 1, origin = "origin"), mark(offset = 4), - delete(offset = 5, seqNr = 2), + delete(offset = 5, to = 2), mark(offset = 6), ), - List(deletes(offset = 6, seqNr = 2, origin = "origin")), + List(deletes(offset = 6, to = 2, origin = "origin")), ), ( Nel.of( append(offset = 0, seqNr = 1), - delete(offset = 1, seqNr = 1), + delete(offset = 1, to = 1), append(offset = 2, seqNr = 2), - delete(offset = 3, seqNr = 2), + delete(offset = 3, to = 2), append(offset = 4, seqNr = 3), ), - List(appends(2, append(offset = 2, seqNr = 2)), deletes(offset = 3, seqNr = 2), appends(4, append(offset = 4, seqNr = 3))), + List(appends(2, append(offset = 2, seqNr = 2)), deletes(offset = 3, to = 2), appends(4, append(offset = 4, seqNr = 3))), ), ( Nel.of( append(offset = 0, seqNr = 1), append(offset = 1, seqNr = 2), - delete(offset = 2, seqNr = 1), + delete(offset = 2, to = 1), append(offset = 3, seqNr = 3), - delete(offset = 4, seqNr = 3), + delete(offset = 4, to = 3), append(offset = 5, seqNr = 4), ), - List(appends(3, append(offset = 3, seqNr = 3)), deletes(offset = 4, seqNr = 3), appends(5, append(offset = 5, seqNr = 4))), + List(appends(3, append(offset = 3, seqNr = 3)), deletes(offset = 4, to = 3), appends(5, append(offset = 5, seqNr = 4))), ), ( Nel.of( append(offset = 0, seqNr = 1), append(offset = 1, seqNr = 2), mark(offset = 2), - delete(offset = 3, seqNr = 1), + delete(offset = 3, to = 1), append(offset = 4, seqNr = 3), append(offset = 5, seqNr = 4), mark(offset = 6), ), List( appends(2, append(offset = 0, seqNr = 1), append(offset = 1, seqNr = 2)), - deletes(offset = 3, seqNr = 1), + deletes(offset = 3, to = 1), appends(6, append(offset = 4, seqNr = 3), append(offset = 5, seqNr = 4)), ), ), @@ -158,17 +158,17 @@ class BatchSpec extends AnyFunSuite with Matchers { append(offset = 0, seqNr = 1), append(offset = 1, seqNr = 2), append(offset = 2, seqNr = 3), - delete(offset = 3, seqNr = 1, origin = "origin"), + delete(offset = 3, to = 1, origin = "origin"), append(offset = 4, seqNr = 4), append(offset = 5, seqNr = 5), - delete(offset = 6, seqNr = 2), + delete(offset = 6, to = 2), append(offset = 7, seqNr = 6), ), List( appends(2, append(offset = 0, seqNr = 1), append(offset = 1, seqNr = 2), append(offset = 2, seqNr = 3)), - deletes(offset = 3, seqNr = 1, origin = "origin"), + deletes(offset = 3, to = 1, origin = "origin"), appends(5, append(offset = 4, seqNr = 4), append(offset = 5, seqNr = 5)), - deletes(offset = 6, seqNr = 2), + deletes(offset = 6, to = 2), appends(7, append(offset = 7, seqNr = 6)), ), ), @@ -177,10 +177,10 @@ class BatchSpec extends AnyFunSuite with Matchers { append(offset = 0, seqNr = 1, seqNrs = 2), append(offset = 1, seqNr = 3, seqNrs = 4), append(offset = 2, seqNr = 5), - delete(offset = 3, seqNr = 1), + delete(offset = 3, to = 1), append(offset = 4, seqNr = 6), append(offset = 5, seqNr = 7), - delete(offset = 6, seqNr = 3), + delete(offset = 6, to = 3), append(offset = 7, seqNr = 8), ), List( @@ -190,9 +190,9 @@ class BatchSpec extends AnyFunSuite with Matchers { append(offset = 1, seqNr = 3, seqNrs = 4), append(offset = 2, seqNr = 5), ), - deletes(offset = 3, seqNr = 1), + deletes(offset = 3, to = 1), appends(5, append(offset = 4, seqNr = 6), append(offset = 5, seqNr = 7)), - deletes(offset = 6, seqNr = 3), + deletes(offset = 6, to = 3), appends(7, append(offset = 7, seqNr = 8)), ), ), @@ -209,46 +209,46 @@ class BatchSpec extends AnyFunSuite with Matchers { Nel.of(purge(offset = 0), append(offset = 1, seqNr = 1)), List(purges(offset = 0), appends(1, append(offset = 1, seqNr = 1))), ), - (Nel.of(delete(offset = 0, seqNr = 1), purge(offset = 1)), List(purges(offset = 1))), + (Nel.of(delete(offset = 0, to = 1), purge(offset = 1)), List(purges(offset = 1))), ( - Nel.of(purge(offset = 0), delete(offset = 1, seqNr = 1)), - List(purges(offset = 0), deletes(offset = 1, seqNr = 1)), + Nel.of(purge(offset = 0), delete(offset = 1, to = 1)), + List(purges(offset = 0), deletes(offset = 1, to = 1)), ), ( - Nel.of(delete(offset = 0, seqNr = 1), delete(offset = 1, seqNr = 2)), - List(deletes(offset = 1, seqNr = 2)), + Nel.of(delete(offset = 0, to = 1), delete(offset = 1, to = 2)), + List(deletes(offset = 1, to = 2)), ), ( Nel.of( append(offset = 0, seqNr = 1, seqNrs = 2), append(offset = 1, seqNr = 3, seqNrs = 4), append(offset = 2, seqNr = 5, seqNrs = 6), - delete(offset = 3, seqNr = 3), - delete(offset = 4, seqNr = 5), + delete(offset = 3, to = 3), + delete(offset = 4, to = 5), ), List( appends(offset = 2, append(offset = 2, seqNr = 5, seqNrs = 6)), - deletes(offset = 4, seqNr = 5), + deletes(offset = 4, to = 5), ), ), ( Nel.of( append(offset = 0, seqNr = 1, seqNrs = 2, 3, 4, 5, 6), - delete(offset = 1, seqNr = 3), - delete(offset = 2, seqNr = 6), + delete(offset = 1, to = 3), + delete(offset = 2, to = 6), ), List( appends(offset = 0, append(offset = 0, seqNr = 1, seqNrs = 2, 3, 4, 5, 6)), - deletes(offset = 2, seqNr = 6), + deletes(offset = 2, to = 6), ), ), ( Nel.of( - delete(offset = 1, seqNr = 10), - delete(offset = 2, seqNr = 6), + delete(offset = 1, to = 10), + delete(offset = 2, to = 6), ), List( - deletes(offset = 2, seqNr = 10), + deletes(offset = 2, to = 10), ), ), ( @@ -256,11 +256,11 @@ class BatchSpec extends AnyFunSuite with Matchers { append(offset = 1797039, seqNr = 574), append(offset = 1801629, seqNr = 575), mark(offset = 1801632), - delete(offset = 1801642, seqNr = 575), + delete(offset = 1801642, to = 575), ), List( appends(offset = 1801632, append(offset = 1801629, seqNr = 575)), - deletes(offset = 1801642, seqNr = 575), + deletes(offset = 1801642, to = 575), ), ), ) @@ -282,8 +282,8 @@ class BatchSpec extends AnyFunSuite with Matchers { Batch.Appends(Offset.unsafe(offset), appends) } - def deletes(offset: Int, seqNr: Int, origin: String = ""): Batch.Delete = { - Batch.Delete(Offset.unsafe(offset), SeqNr.unsafe(seqNr).toDeleteTo, originOf(origin), version = none) + def deletes(offset: Int, to: Int, origin: String = ""): Batch.Delete = { + Batch.Delete(Offset.unsafe(offset), SeqNr.unsafe(to).toDeleteTo, originOf(origin), version = none) } def purges(offset: Int, origin: String = ""): Batch.Purge = { @@ -294,8 +294,8 @@ class BatchSpec extends AnyFunSuite with Matchers { A.Append(offset = offset, seqNr = seqNr, seqNrs = seqNrs.toList) } - def delete(offset: Int, seqNr: Int, origin: String = ""): A = { - A.Delete(offset = offset, seqNr = seqNr, origin = origin) + def delete(offset: Int, to: Int, origin: String = ""): A = { + A.Delete(offset = offset, seqNr = to, origin = origin) } def mark(offset: Int): A = { From 74382cae0f96abd50f11cb8b0a826b581104ae46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mareks=20Ramp=C4=81ns?= <8796159+mr-git@users.noreply.github.com> Date: Tue, 5 Nov 2024 11:26:19 +0200 Subject: [PATCH 2/2] add module for explicit support of benchmarking --- .../journal/replicator/BatchBenchmark.scala | 346 ++++++++++++++++++ .../journal/replicator/Batch_4_1_0.scala | 172 +++++++++ .../Batch_4_1_0_Alternative_with_Vector.scala | 114 ++++++ ...ernative_with_Aggressive_Reshuffling.scala | 136 +++++++ build.sbt | 10 + project/plugins.sbt | 2 + 6 files changed, 780 insertions(+) create mode 100644 benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/BatchBenchmark.scala create mode 100644 benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/Batch_4_1_0.scala create mode 100644 benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/Batch_4_1_0_Alternative_with_Vector.scala create mode 100644 benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/Batch_Alternative_with_Aggressive_Reshuffling.scala diff --git a/benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/BatchBenchmark.scala b/benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/BatchBenchmark.scala new file mode 100644 index 000000000..ec5156f15 --- /dev/null +++ b/benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/BatchBenchmark.scala @@ -0,0 +1,346 @@ +package com.evolutiongaming.kafka.journal.replicator + +import cats.data.NonEmptyList as Nel +import cats.syntax.all.* +import com.evolutiongaming.kafka.journal.* +import com.evolutiongaming.skafka.Offset +import scodec.bits.ByteVector +import org.openjdk.jmh.annotations.{Benchmark, Fork, Measurement, Scope, State, Warmup} +import org.openjdk.jmh.infra.Blackhole + +import java.time.Instant +import java.util.concurrent.TimeUnit + +/** To run benchmarks: {{{sbt benchmark/Jmh/run com.evolutiongaming.kafka.journal.replicator.BatchBenchmark}}} + */ +@State(Scope.Benchmark) +@Fork(1) +@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS) +class BatchBenchmark { + + @Benchmark + def original(blackhole: Blackhole): Unit = { + val of = Batch_4_1_0.of + callAll[Batch_4_1_0](blackhole, of) + } + + @Benchmark + def alternativeWithVector(blackhole: Blackhole): Unit = { + val of = Batch_4_1_0_Alternative_with_Vector.of + callAll[Batch_4_1_0_Alternative_with_Vector](blackhole, of) + } + + @Benchmark + def alternative4_1_2(blackhole: Blackhole): Unit = { + val of = Batch_Alternative_with_Aggressive_Reshuffling.of + callAll[Batch_Alternative_with_Aggressive_Reshuffling](blackhole, of) + } + + def callAll[T](blackhole: Blackhole, of: Nel[ActionRecord[Action]] => List[T]): Unit = { + blackhole.consume(of(b1.map(actionRecordOf))) + blackhole.consume(of(b2.map(actionRecordOf))) + blackhole.consume(of(b3.map(actionRecordOf))) + blackhole.consume(of(b4.map(actionRecordOf))) + blackhole.consume(of(b5.map(actionRecordOf))) + blackhole.consume(of(b6.map(actionRecordOf))) + blackhole.consume(of(b7.map(actionRecordOf))) + blackhole.consume(of(b8.map(actionRecordOf))) + blackhole.consume(of(b9.map(actionRecordOf))) + blackhole.consume(of(b10.map(actionRecordOf))) + blackhole.consume(of(b11.map(actionRecordOf))) + blackhole.consume(of(b12.map(actionRecordOf))) + blackhole.consume(of(b13.map(actionRecordOf))) + blackhole.consume(of(b14.map(actionRecordOf))) + blackhole.consume(of(b15.map(actionRecordOf))) + blackhole.consume(of(b16.map(actionRecordOf))) + blackhole.consume(of(b17.map(actionRecordOf))) + blackhole.consume(of(b18.map(actionRecordOf))) + blackhole.consume(of(b19.map(actionRecordOf))) + blackhole.consume(of(b20.map(actionRecordOf))) + blackhole.consume(of(b21.map(actionRecordOf))) + blackhole.consume(of(b22.map(actionRecordOf))) + blackhole.consume(of(b23.map(actionRecordOf))) + blackhole.consume(of(b24.map(actionRecordOf))) + blackhole.consume(of(b25.map(actionRecordOf))) + blackhole.consume(of(b26.map(actionRecordOf))) + blackhole.consume(of(b27.map(actionRecordOf))) + blackhole.consume(of(b28.map(actionRecordOf))) + blackhole.consume(of(b29.map(actionRecordOf))) + blackhole.consume(of(b30.map(actionRecordOf))) + blackhole.consume(of(b31.map(actionRecordOf))) + blackhole.consume(of(b32.map(actionRecordOf))) + blackhole.consume(of(b33.map(actionRecordOf))) + blackhole.consume(of(b34.map(actionRecordOf))) + blackhole.consume(of(b35.map(actionRecordOf))) + blackhole.consume(of(b36.map(actionRecordOf))) + blackhole.consume(of(b37.map(actionRecordOf))) + blackhole.consume(of(b38.map(actionRecordOf))) + blackhole.consume(of(b39.map(actionRecordOf))) + blackhole.consume(of(b30.map(actionRecordOf))) + blackhole.consume(of(b41.map(actionRecordOf))) + blackhole.consume(of(b42.map(actionRecordOf))) + blackhole.consume(of(b43.map(actionRecordOf))) + blackhole.consume(of(b44.map(actionRecordOf))) + blackhole.consume(of(b45.map(actionRecordOf))) + } + + def b1: Nel[A] = Nel.of(mark(offset = 0)) + def b2: Nel[A] = Nel.of(mark(offset = 0), mark(offset = 1)) + def b3: Nel[A] = Nel.of(append(offset = 0, seqNr = 1)) + def b4: Nel[A] = Nel.of(append(offset = 0, seqNr = 1, seqNrs = 2)) + def b5: Nel[A] = Nel.of(append(offset = 0, seqNr = 1, seqNrs = 2), append(offset = 1, seqNr = 3, seqNrs = 4)) + def b6: Nel[A] = Nel.of(append(offset = 0, seqNr = 1), mark(offset = 1)) + def b7: Nel[A] = Nel.of(mark(offset = 0), append(offset = 1, seqNr = 1)) + def b8: Nel[A] = Nel.of(append(offset = 0, seqNr = 1), append(offset = 1, seqNr = 2)) + def b9: Nel[A] = Nel.of( + mark(offset = 0), + append(offset = 1, seqNr = 1), + mark(offset = 2), + append(offset = 3, seqNr = 2), + mark(offset = 4), + ) + def b10: Nel[A] = Nel.of(delete(offset = 1, to = 1)) + def b11: Nel[A] = Nel.of(mark(offset = 1), delete(offset = 2, to = 1)) + def b12: Nel[A] = Nel.of(delete(offset = 1, to = 1), mark(offset = 2)) + def b13: Nel[A] = Nel.of(delete(offset = 1, to = 1), append(offset = 2, seqNr = 2)) + def b14: Nel[A] = Nel.of(append(offset = 1, seqNr = 2), delete(offset = 2, to = 1)) + def b15: Nel[A] = Nel.of(append(offset = 1, seqNr = 1, seqNrs = 2, 3), delete(offset = 2, to = 1)) + def b16: Nel[A] = Nel.of(append(offset = 1, seqNr = 1), delete(offset = 2, to = 1), append(offset = 3, seqNr = 2)) + def b17: Nel[A] = Nel.of( + append(offset = 1, seqNr = 1), + delete(offset = 2, to = 1, origin = "origin1"), + append(offset = 3, seqNr = 2), + delete(offset = 4, to = 2, origin = "origin2"), + ) + def b18: Nel[A] = Nel.of( + append(offset = 1, seqNr = 1), + delete(offset = 2, to = 1, origin = "origin"), + append(offset = 3, seqNr = 2), + delete(offset = 4, to = 2), + ) + def b19: Nel[A] = Nel.of( + append(offset = 1, seqNr = 1), + append(offset = 2, seqNr = 2), + delete(offset = 3, to = 1, origin = "origin1"), + delete(offset = 4, to = 2, origin = "origin2"), + ) + def b20: Nel[A] = Nel.of( + append(offset = 1, seqNr = 1), + append(offset = 2, seqNr = 2), + delete(offset = 3, to = 1), + delete(offset = 4, to = 2, origin = "origin"), + ) + def b21: Nel[A] = Nel.of(delete(offset = 2, to = 1), delete(offset = 3, to = 2)) + def b22: Nel[A] = Nel.of(delete(offset = 2, to = 2, origin = "origin"), delete(offset = 3, to = 1)) + def b23: Nel[A] = Nel.of( + mark(offset = 2), + delete(offset = 3, to = 1, origin = "origin"), + mark(offset = 4), + delete(offset = 5, to = 2), + mark(offset = 6), + ) + def b24: Nel[A] = Nel.of( + append(offset = 0, seqNr = 1), + delete(offset = 1, to = 1), + append(offset = 2, seqNr = 2), + delete(offset = 3, to = 2), + append(offset = 4, seqNr = 3), + ) + def b25: Nel[A] = Nel.of( + append(offset = 0, seqNr = 1), + append(offset = 1, seqNr = 2), + delete(offset = 2, to = 1), + append(offset = 3, seqNr = 3), + delete(offset = 4, to = 3), + append(offset = 5, seqNr = 4), + ) + def b26: Nel[A] = Nel.of( + append(offset = 0, seqNr = 1), + append(offset = 1, seqNr = 2), + mark(offset = 2), + delete(offset = 3, to = 1), + append(offset = 4, seqNr = 3), + append(offset = 5, seqNr = 4), + mark(offset = 6), + ) + def b27: Nel[A] = Nel.of( + append(offset = 0, seqNr = 1), + append(offset = 1, seqNr = 2), + append(offset = 2, seqNr = 3), + delete(offset = 3, to = 1, origin = "origin"), + append(offset = 4, seqNr = 4), + append(offset = 5, seqNr = 5), + delete(offset = 6, to = 2), + append(offset = 7, seqNr = 6), + ) + def b28: Nel[A] = Nel.of( + append(offset = 0, seqNr = 1, seqNrs = 2), + append(offset = 1, seqNr = 3, seqNrs = 4), + append(offset = 2, seqNr = 5), + delete(offset = 3, to = 1), + append(offset = 4, seqNr = 6), + append(offset = 5, seqNr = 7), + delete(offset = 6, to = 3), + append(offset = 7, seqNr = 8), + ) + def b29: Nel[A] = Nel.of(purge(offset = 0)) + def b30: Nel[A] = Nel.of(mark(offset = 0), purge(offset = 1)) + def b31: Nel[A] = Nel.of(purge(offset = 0), mark(offset = 1)) + def b32: Nel[A] = Nel.of(purge(offset = 0, origin = "origin"), mark(offset = 1), purge(offset = 2)) + def b33: Nel[A] = Nel.of(purge(offset = 0, origin = "origin0"), mark(offset = 1), purge(offset = 2, origin = "origin")) + def b34: Nel[A] = Nel.of(append(offset = 0, seqNr = 1), purge(offset = 1)) + def b35: Nel[A] = Nel.of(purge(offset = 0), append(offset = 1, seqNr = 1)) + def b36: Nel[A] = Nel.of(delete(offset = 0, to = 1), purge(offset = 1)) + def b37: Nel[A] = Nel.of(purge(offset = 0), delete(offset = 1, to = 1)) + def b38: Nel[A] = Nel.of(delete(offset = 0, to = 1), delete(offset = 1, to = 2)) + def b39: Nel[A] = Nel.of( + append(offset = 0, seqNr = 1, seqNrs = 2), + append(offset = 1, seqNr = 3, seqNrs = 4), + append(offset = 2, seqNr = 5, seqNrs = 6), + delete(offset = 3, to = 3), + delete(offset = 4, to = 5), + ) + def b40: Nel[A] = Nel.of( + append(offset = 0, seqNr = 1, seqNrs = 2, 3, 4, 5, 6), + delete(offset = 1, to = 3), + delete(offset = 2, to = 6), + ) + def b41: Nel[A] = Nel.of( + append(offset = 0, seqNr = 1, seqNrs = 2, 3, 4), + append(offset = 1, seqNr = 5, seqNrs = 6), + delete(offset = 2, to = 3), + delete(offset = 3, to = 6), + ) + def b42: Nel[A] = Nel.of( + append(offset = 0, seqNr = 1, seqNrs = 2, 3, 4), + append(offset = 1, seqNr = 5, seqNrs = 6), + delete(offset = 2, to = 3), + ) + def b43: Nel[A] = Nel.of( + delete(offset = 1, to = 10), + delete(offset = 2, to = 6), + ) + def b44: Nel[A] = Nel.of( // state: delete_to = 384, seqNr = 573 (in Cassandra) + append(offset = 1797039, seqNr = 574), + append(offset = 1801629, seqNr = 575), + mark(offset = 1801632), + delete(offset = 1801642, to = 575), + ) + + def b45: Nel[A] = Nel.of( // state: delete_to = 384, seqNr = 573 (in Cassandra) + append(offset = 1, seqNr = 1), + append(offset = 2, seqNr = 2), + append(offset = 3, seqNr = 3), + append(offset = 4, seqNr = 4), + append(offset = 5, seqNr = 5), + append(offset = 6, seqNr = 6), + append(offset = 7, seqNr = 7), + append(offset = 8, seqNr = 8), + append(offset = 9, seqNr = 9), + delete(offset = 10, to = 5), + append(offset = 11, seqNr = 11), + append(offset = 12, seqNr = 12), + append(offset = 13, seqNr = 13), + append(offset = 14, seqNr = 14), + append(offset = 15, seqNr = 15), + append(offset = 16, seqNr = 16), + append(offset = 17, seqNr = 17), + append(offset = 18, seqNr = 18), + append(offset = 19, seqNr = 19), + delete(offset = 20, to = 15), + append(offset = 21, seqNr = 21), + append(offset = 22, seqNr = 22), + append(offset = 23, seqNr = 23), + append(offset = 24, seqNr = 24), + append(offset = 25, seqNr = 25), + append(offset = 26, seqNr = 26), + append(offset = 27, seqNr = 27), + append(offset = 28, seqNr = 28), + append(offset = 29, seqNr = 29), + ) + + private val keyOf = Key(id = "id", topic = "topic") + + private val timestamp = Instant.now() + + def append(offset: Int, seqNr: Int, seqNrs: Int*): A.Append = { + A.Append(offset = offset, seqNr = seqNr, seqNrs = seqNrs.toList) + } + + def delete(offset: Int, to: Int, origin: String = ""): A = { + A.Delete(offset = offset, seqNr = to, origin = origin) + } + + def mark(offset: Int): A = { + A.Mark(offset = offset) + } + + def purge(offset: Int, origin: String = ""): A = { + A.Purge(offset = offset, origin = origin) + } + + def seqNrOf(value: Int): SeqNr = SeqNr.unsafe(value) + + def originOf(origin: String): Option[Origin] = { + if (origin.isEmpty) none else Origin(origin).some + } + + def appendOf(seqNrs: Nel[Int]): Action.Append = { + Action.Append( + key = keyOf, + timestamp = timestamp, + header = ActionHeader.Append( + range = SeqRange(seqNrOf(seqNrs.head), seqNrOf(seqNrs.last)), + payloadType = PayloadType.Binary, + origin = none, + version = none, + metadata = HeaderMetadata.empty, + ), + payload = ByteVector.empty, + headers = Headers.empty, + ) + } + + def deleteOf(seqNr: Int, origin: String): Action.Delete = { + Action.Delete(keyOf, timestamp, seqNrOf(seqNr).toDeleteTo, originOf(origin), version = none) + } + + def actionOf(a: A): Action = { + a match { + case a: A.Append => appendOf(Nel(a.seqNr, a.seqNrs)) + case a: A.Delete => deleteOf(seqNr = a.seqNr, origin = a.origin) + case a: A.Purge => Action.Purge(keyOf, timestamp, origin = originOf(a.origin), version = none) + case _: A.Mark => Action.Mark(keyOf, timestamp, ActionHeader.Mark("id", none, version = none)) + } + } + + def actionRecordOf(a: A): ActionRecord[Action] = { + val action = actionOf(a) + actionRecordOf(action, a.offset) + } + + def actionRecordOf[T <: Action](action: T, offset: Int): ActionRecord[T] = { + ActionRecord(action, PartitionOffset(offset = Offset.unsafe(offset))) + } + + sealed trait A { + def offset: Int + } + + object A { + + case class Append(offset: Int, seqNr: Int, seqNrs: List[Int]) extends A { + override def toString: String = { + val range = seqNrs.lastOption.fold(seqNr.toString) { to => s"$seqNr..$to" } + s"$productPrefix($offset,$range)" + } + } + + case class Delete(offset: Int, seqNr: Int, origin: String) extends A + + case class Mark(offset: Int) extends A + + case class Purge(offset: Int, origin: String) extends A + } +} diff --git a/benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/Batch_4_1_0.scala b/benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/Batch_4_1_0.scala new file mode 100644 index 000000000..36da54971 --- /dev/null +++ b/benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/Batch_4_1_0.scala @@ -0,0 +1,172 @@ +package com.evolutiongaming.kafka.journal.replicator + +import cats.data.NonEmptyList +import cats.syntax.all.* +import com.evolutiongaming.kafka.journal.* +import com.evolutiongaming.skafka.Offset + +/** + * Original implementation as it was in version `4.1.0` + */ +private[journal] sealed abstract class Batch_4_1_0 extends Product { + + def offset: Offset +} + +private[journal] object Batch_4_1_0 { + + def of(records: NonEmptyList[ActionRecord[Action]]): List[Batch_4_1_0] = { + + // returns `true` when we can optimize Cassandra usage by NOT inserting "previous" Append actions in DB + // because "current" Delete action will discard them all + def cut(appends: Appends, delete: Action.Delete): Boolean = { + val append = appends.records.head.action + append.range.to <= delete.to.value + } + + def dropDeleted(appends: Appends, delete: Action.Delete): Option[Appends] = { + // we cannot drop individual events in record as we do not parse the payload here (`records.head.action.payload: ByteVector`) + val validRecords = appends.records.collect { case record if record.action.range.to >= delete.to.value => record } + NonEmptyList.fromList(validRecords).map { Appends(appends.offset, _) } + } + + records + .foldLeft(List.empty[Batch_4_1_0]) { (bs, record) => + val offset = record.partitionOffset.offset + + def appendsOf(records: NonEmptyList[ActionRecord[Action.Append]]): Appends = { + Appends(offset, records) + } + + def deleteOf(to: DeleteTo, origin: Option[Origin], version: Option[Version]): Delete = { + Delete(offset, to, origin, version) + } + + def purgeOf(origin: Option[Origin], version: Option[Version]): Purge = { + Purge(offset, origin, version) + } + + def actionRecord[A <: Action](a: A): ActionRecord[A] = { + record.copy(action = a) + } + + def origin: Option[Origin] = { + bs.foldRight(none[Origin]) { (b, origin) => + origin orElse { + b match { + case b: Batch_4_1_0.Delete => b.origin + case _: Batch_4_1_0.Appends => none + case b: Batch_4_1_0.Purge => b.origin + } + } + } + } + + def version: Option[Version] = { + bs.foldRight(none[Version]) { (b, version) => + version orElse { + b match { + case b: Batch_4_1_0.Delete => b.version + case _: Batch_4_1_0.Appends => none + case b: Batch_4_1_0.Purge => b.version + } + } + } + } + + bs match { + case b :: tail => + (b, record.action) match { + case (b: Appends, a: Action.Append) => + val records = actionRecord(a) :: b.records + appendsOf(records) :: tail + + case (b: Appends, _: Action.Mark) => + appendsOf(b.records) :: tail + + case (b: Appends, a: Action.Delete) => + if (cut(b, a)) { + val delete = deleteOf(a.to, origin orElse a.origin, version orElse a.version) + // preserve last `append` as we want to update `expireAfter` too, but we do not have [easy] access to it + delete :: b.copy(records = NonEmptyList.of(b.records.head)) :: Nil + } else { + val delete = deleteOf(a.to, a.origin, a.version) + delete :: bs + } + + case (_: Appends, a: Action.Purge) => + purgeOf(a.origin, a.version) :: Nil + + case (b: Delete, a: Action.Append) => + appendsOf(NonEmptyList.of(actionRecord(a))) :: b :: tail + + case (b: Delete, _: Action.Mark) => + b.copy(offset = offset) :: tail + + case (b: Delete, a: Action.Delete) => + if (a.to > b.to) { + val cleanTail: List[Batch_4_1_0] = tail.mapFilter { + case appends: Appends => dropDeleted(appends, a) + case delete: Delete => delete.some + case purge: Purge => purge.some + } + + val delete = deleteOf(a.to, b.origin orElse a.origin, b.version orElse a.version) + delete :: cleanTail + } else { + val delete = b.copy(offset = offset, origin = b.origin orElse a.origin, version = b.version orElse a.version) + delete :: tail + } + + case (_: Delete, a: Action.Purge) => + purgeOf(a.origin, a.version) :: Nil + + case (b: Purge, a: Action.Append) => + appendsOf(NonEmptyList.of(actionRecord(a))) :: b :: Nil + + case (b: Purge, _: Action.Mark) => + b.copy(offset = offset) :: Nil + + case (b: Purge, a: Action.Delete) => + deleteOf(a.to, a.origin, a.version) :: b :: Nil + + case (_: Purge, a: Action.Purge) => + purgeOf(a.origin, a.version) :: Nil + } + + case Nil => + record.action match { + case a: Action.Append => appendsOf(NonEmptyList.of(actionRecord(a))) :: Nil + case _: Action.Mark => Nil + case a: Action.Delete => deleteOf(a.to, a.origin, a.version) :: Nil + case a: Action.Purge => purgeOf(a.origin, a.version) :: Nil + } + } + } + .foldLeft(List.empty[Batch_4_1_0]) { (bs, b) => // reverse order of Batch_4_1_0es + b match { + case b: Appends => b.copy(records = b.records.reverse) :: bs // reverse append actions + case b: Delete => b :: bs + case b: Purge => b :: bs + } + } + } + + final case class Appends( + offset: Offset, + records: NonEmptyList[ActionRecord[Action.Append]], + ) extends Batch_4_1_0 + + final case class Delete( + offset: Offset, + to: DeleteTo, + origin: Option[Origin], + version: Option[Version], + ) extends Batch_4_1_0 + + final case class Purge( + offset: Offset, + origin: Option[Origin], + version: Option[Version], + ) extends Batch_4_1_0 +} diff --git a/benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/Batch_4_1_0_Alternative_with_Vector.scala b/benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/Batch_4_1_0_Alternative_with_Vector.scala new file mode 100644 index 000000000..c6d827d68 --- /dev/null +++ b/benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/Batch_4_1_0_Alternative_with_Vector.scala @@ -0,0 +1,114 @@ +package com.evolutiongaming.kafka.journal.replicator + +import cats.data.NonEmptyList +import cats.syntax.all.* +import com.evolutiongaming.kafka.journal.* +import com.evolutiongaming.skafka.Offset + +/** + * Copy of `Batch` with changes: + * - change batching algorithm so it is easier to comprehend + * - records are aggregated within `Vector` to make append faster (line: 31) + */ +private[journal] sealed abstract class Batch_4_1_0_Alternative_with_Vector extends Product { + + def offset: Offset +} + +private[journal] object Batch_4_1_0_Alternative_with_Vector { + + def of(records: NonEmptyList[ActionRecord[Action]]): List[Batch_4_1_0_Alternative_with_Vector] = { + + val actions = records + .foldLeft(Vector.empty[ActionRecord[Action]]) { // CHANGE: use `Vector` + // drop all `Mark` actions + case (acc, ActionRecord(_: Action.Mark, _)) => acc + + // drop all actions before `Purge` + case (_, r @ ActionRecord(_: Action.Purge, _)) => Vector(r) + + // collect `Append` and `Delete` actions + case (acc, record) => acc :+ record // CHANGE: use `prepend` + } + .groupBy { record => + record.action match { + case _: Action.Mark => "M" // cannot be - we remove them in previous step + case _: Action.Purge => "P" + case _: Action.Delete => "D" + case _: Action.Append => "A" + } + } + + val purge = actions.get("P").flatMap { + // can be at most one + _.headOption.flatMap { record => + record.action match { + case purge: Action.Purge => Purge(record.offset, purge.origin, purge.version).some + case _ => none + } + } + } + + val delete0 = actions.get("D").map { deletes => + // take `Delete` action with largest `seqNr` + val actions = deletes.collect { case ActionRecord(a: Action.Delete, po) => ActionRecord(a, po) } // recover type + val delete = actions.reduceLeft { (a, b) => + if (a.action.to.value > b.action.to.value) a + else { + // TODO MR here we expect that in `metajournal` we save: highest `seqNr` with first `origin` - is it important? + // if not, then alternative code could be simple: `b` + val origin = a.action.header.origin.orElse(b.action.header.origin) + b.copy(action = b.action.copy(header = b.action.header.copy(origin = origin))) + } + } + Delete(delete.offset, delete.action.to, delete.action.origin, delete.action.version) + } + + val appends = actions.get("A").flatMap { appends => + // merge all `Append`s + val deleteTo = delete0.map(_.to.value) + val actions0 = appends.collect { + // drop to be deleted `Append`s, except last one - we want to save its expiration in `metajournal` + case ActionRecord(a: Action.Append, po) if deleteTo.forall(_ <= a.range.to) => ActionRecord(a, po) + } + + // we can drop first `append`, if `deleteTo` will discard it AND there is at least one more `append` + val actions = actions0.headOption match { + case Some(head) if deleteTo.contains(head.action.range.to) && actions0.tail.nonEmpty => actions0.tail + case _ => actions0 + } + + NonEmptyList.fromList(actions.toList) match { // CHANGE: use `.toList` + case Some(actions) => Appends(appends.last.offset, actions).some + case None => none + } + } + + // if `delete` was not last action, adjust `delete`'s batch offset to update `metajournal` correctly + val delete = appends match { + case Some(appends) => delete0.map(delete => delete.copy(offset = delete.offset max appends.offset)) + case None => delete0 + } + + // apply action batches in order: `Purge`, `Append`s and `Delete` + List(purge, appends, delete).flatten + } + + final case class Appends( + offset: Offset, + records: NonEmptyList[ActionRecord[Action.Append]], + ) extends Batch_4_1_0_Alternative_with_Vector + + final case class Delete( + offset: Offset, + to: DeleteTo, + origin: Option[Origin], + version: Option[Version], + ) extends Batch_4_1_0_Alternative_with_Vector + + final case class Purge( + offset: Offset, + origin: Option[Origin], + version: Option[Version], + ) extends Batch_4_1_0_Alternative_with_Vector +} diff --git a/benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/Batch_Alternative_with_Aggressive_Reshuffling.scala b/benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/Batch_Alternative_with_Aggressive_Reshuffling.scala new file mode 100644 index 000000000..3fd3e51dc --- /dev/null +++ b/benchmark/src/test/scala/com/evolutiongaming/kafka/journal/replicator/Batch_Alternative_with_Aggressive_Reshuffling.scala @@ -0,0 +1,136 @@ +package com.evolutiongaming.kafka.journal.replicator + +import cats.data.NonEmptyList +import cats.syntax.all.* +import com.evolutiongaming.kafka.journal.* +import com.evolutiongaming.skafka.Offset + +/** + * Copy of `Batch` with changes: + * - aggressive batching algorithm based on some assumptions with reshuffling of actions + */ +private[journal] sealed abstract class Batch_Alternative_with_Aggressive_Reshuffling extends Product { + + def offset: Offset +} + +private[journal] object Batch_Alternative_with_Aggressive_Reshuffling { + + def of(records: NonEmptyList[ActionRecord[Action]]): List[Batch_Alternative_with_Aggressive_Reshuffling] = { + State(records).batches + } + + /** Builds minimal set of actions, which will execute fewer calls to Cassandra while producing the same result */ + private object State { + def apply(records: NonEmptyList[ActionRecord[Action]]): State = { + records.reverse.foldLeft(State()) { _.handle(_) } + } + } + + private final case class State( + private val purge: Option[Purge] = None, + private val appends: Option[Appends] = None, + private val delete: Option[Delete] = None, + ) { + // Expects records to be provided in reversed order, e.g., youngest first + private def handle: ActionRecord[Action] => State = { + case _ if this.purge.nonEmpty => // ignore all actions before `Purge` + this + + case ActionRecord(_: Action.Mark, _) => + this + + case ActionRecord(purge: Action.Purge, partitionOffset: PartitionOffset) => + handlePurge(purge, partitionOffset) + + case ActionRecord(delete: Action.Delete, partitionOffset: PartitionOffset) => + handleDelete(delete, partitionOffset) + + case ActionRecord(append: Action.Append, partitionOffset: PartitionOffset) => + handleAppend(append, partitionOffset) + } + + private def handlePurge(purge: Action.Purge, partitionOffset: PartitionOffset): State = { + this.copy( + purge = Purge(partitionOffset.offset, purge.origin, purge.version).some, + ) + } + + private def handleDelete(delete: Action.Delete, partitionOffset: PartitionOffset): State = { + val delete_ = this.delete match { + case Some(younger) => + // take `origin` and `version` from "older" entity, if it has them + val origin = delete.origin.orElse(younger.origin) + val version = delete.version.orElse(younger.version) + // make `Delete` action with largest `seqNr` and largest `offset` + if (younger.to < delete.to) Delete(partitionOffset.offset, delete.to, origin, version) + else younger.copy(origin = origin, version = version) + + case None => + Delete(partitionOffset.offset, delete.to, delete.origin, delete.version) + } + this.copy( + delete = delete_.some, + ) + } + + private def handleAppend(append: Action.Append, partitionOffset: PartitionOffset): State = { + // ignore `Action.Append`, if it would get deleted + if (this.delete.forall(_.to.value <= append.range.to)) { + val appends = this.appends match { + case Some(appends) => + appends.copy(records = ActionRecord(append, partitionOffset) :: appends.records) + case None => + Appends(partitionOffset.offset, NonEmptyList.of(ActionRecord(append, partitionOffset))) + } + this.copy( + appends = appends.some, + ) + } else { + this + } + } + + def batches: List[Batch_Alternative_with_Aggressive_Reshuffling] = { + // we can drop first `append`, if `deleteTo` will discard it AND there is at least one more `append` + // we have to preserve one `append` to store latest `seqNr` and populate `expireAfter` + val appends = { + this.appends.flatMap { appends => + val deleteTo = this.delete.map(_.to.value) + val records = appends.records + val actions = + if (deleteTo.contains(records.head.action.range.to)) NonEmptyList.fromList(records.tail).getOrElse(records) + else records + appends.copy(records = actions).some + } + } + + // if `delete` was not last action, adjust `delete`'s batch offset to update `metajournal` correctly + val delete = appends match { + case Some(appends) => this.delete.map(delete => delete.copy(offset = delete.offset max appends.offset)) + case None => this.delete + } + + // apply action batches in order: `Purge`, `Append`s and `Delete` + List(purge, appends, delete).flatten + } + } + + final case class Appends( + offset: Offset, + records: NonEmptyList[ActionRecord[Action.Append]], + ) extends Batch_Alternative_with_Aggressive_Reshuffling + + final case class Delete( + offset: Offset, + to: DeleteTo, + origin: Option[Origin], + version: Option[Version], + ) extends Batch_Alternative_with_Aggressive_Reshuffling + + final case class Purge( + offset: Offset, + origin: Option[Origin], + version: Option[Version], + ) extends Batch_Alternative_with_Aggressive_Reshuffling +} diff --git a/build.sbt b/build.sbt index 2f85e12b2..552fddc25 100644 --- a/build.sbt +++ b/build.sbt @@ -230,3 +230,13 @@ lazy val `persistence-circe` = project .settings(name := "kafka-journal-persistence-circe") .settings(commonSettings) .dependsOn(`journal-circe`, persistence % "test->test;compile->compile") + +lazy val benchmark = project + .dependsOn(replicator % "test->test", journal % "test->test;compile->compile", `eventual-cassandra`) + .enablePlugins(JmhPlugin) + .settings(commonSettings) + .settings( + Jmh / sourceDirectory := (Test / sourceDirectory).value, + Jmh / classDirectory := (Test / classDirectory).value, + Jmh / dependencyClasspath := (Test / dependencyClasspath).value, + ) diff --git a/project/plugins.sbt b/project/plugins.sbt index f4597230f..94089c99a 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -13,3 +13,5 @@ addSbtPlugin("ch.epfl.scala" % "sbt-version-policy" % "3.2.1") addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.13.0") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") + +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.7")