From cdd745881612a0a19ac8c06f697c88b7cc547196 Mon Sep 17 00:00:00 2001 From: Ruslans Tarasovs Date: Mon, 4 Dec 2023 16:53:47 +0200 Subject: [PATCH] Separate CassandraSnapshotStoreConfig from KafkaJournalConfig. --- .../cassandra/SnapshotCassandra.scala | 21 +++---- .../cassandra/SnapshotCassandraConfig.scala | 45 +++++++++++++++ .../cassandra/SnapshotCassandraTest.scala | 23 +++++--- .../journal/CassandraSnapshotStore.scala | 19 ++++--- .../CassandraSnapshotStoreConfig.scala | 56 +++++++++++++++++++ .../kafka/journal/SnapshotStoreAdapter.scala | 6 +- tests/src/test/resources/application.conf | 4 ++ .../snapshot/SnapshotStorePerfSpec.scala | 2 +- 8 files changed, 142 insertions(+), 34 deletions(-) create mode 100644 eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraConfig.scala create mode 100644 persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStoreConfig.scala diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandra.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandra.scala index 82330c780..bb1e35391 100644 --- a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandra.scala +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandra.scala @@ -14,18 +14,14 @@ import java.time.Instant object SnapshotCassandra { - // TODO: make it configurable - val BufferSize = 10 - def of[F[_]: Async: Parallel: LogOf]( - config: EventualCassandraConfig, + config: SnapshotCassandraConfig, origin: Option[Origin], cassandraClusterOf: CassandraClusterOf[F] ): Resource[F, SnapshotStoreFlat[F]] = { - def store(implicit cassandraCluster: CassandraCluster[F], cassandraSession: CassandraSession[F]) = { - of(config.schema, origin, config.consistencyConfig) - } + def store(implicit cassandraCluster: CassandraCluster[F], cassandraSession: CassandraSession[F]) = + of(config.schema, origin, config.consistencyConfig, config.numberOfSnapshots) for { cassandraCluster <- CassandraCluster.of[F](config.client, cassandraClusterOf, config.retries) @@ -38,16 +34,17 @@ object SnapshotCassandra { def of[F[_]: Temporal: Parallel: CassandraCluster: CassandraSession: LogOf]( schemaConfig: SchemaConfig, origin: Option[Origin], - consistencyConfig: ConsistencyConfig + consistencyConfig: ConsistencyConfig, + numberOfSnapshots: Int ): F[SnapshotStoreFlat[F]] = for { schema <- SetupSchema[F](schemaConfig, origin, consistencyConfig) statements <- Statements.of[F](schema, consistencyConfig) - } yield SnapshotCassandra(statements) + } yield SnapshotCassandra(statements, numberOfSnapshots) private sealed abstract class Main - def apply[F[_]: MonadThrow](statements: Statements[F]): SnapshotStoreFlat[F] = { + def apply[F[_]: MonadThrow](statements: Statements[F], numberOfSnapshots: Int): SnapshotStoreFlat[F] = { new Main with SnapshotStoreFlat[F] { def save(key: Key, snapshot: SnapshotRecord[EventualPayloadAndType]): F[Unit] = { @@ -56,7 +53,7 @@ object SnapshotCassandra { case s if s.values.exists { case (seqNr, _) => snapshot.snapshot.seqNr == seqNr } => update(key, s, snapshot) // there is a free place to add a snapshot - case s if s.size < BufferSize => insert(key, s, snapshot) + case s if s.size < numberOfSnapshots => insert(key, s, snapshot) // all rows are taken, we have to update one of them case s => replace(key, s, snapshot) } @@ -67,7 +64,7 @@ object SnapshotCassandra { savedSnapshots: Map[BufferNr, (SeqNr, Instant)], snapshot: SnapshotRecord[EventualPayloadAndType] ): F[Unit] = { - val allBufferNrs = BufferNr.listOf(BufferSize) + val allBufferNrs = BufferNr.listOf(numberOfSnapshots) val takenBufferNrs = savedSnapshots.keySet val freeBufferNr = allBufferNrs.find(bufferNr => !takenBufferNrs.contains(bufferNr)) MonadThrow[F].fromOption(freeBufferNr, SnapshotStoreError("Could not find a free key")).flatMap { bufferNr => diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraConfig.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraConfig.scala new file mode 100644 index 000000000..c475ed31e --- /dev/null +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraConfig.scala @@ -0,0 +1,45 @@ +package com.evolutiongaming.kafka.journal.eventual.cassandra + +import com.datastax.driver.core.ConsistencyLevel +import com.evolutiongaming.scassandra.{CassandraConfig, QueryConfig} +import pureconfig.ConfigReader +import pureconfig.generic.semiauto.deriveReader + +/** Cassandra-specific configuration used by a plugin. + * + * Specifies long time storage configuration and Cassandra client parameters. + * + * @param retries + * Number of retries in [[com.evolutiongaming.scassandra.NextHostRetryPolicy]]. It will retry doing a request on the + * same host if it timed out, or switch to another host if error happened, or the host was not available on a first + * attempt. + * @param numberOfSnapshots + * Maximum number of snapshots to be stored per single persistence id. If the number of snapshots reaches this + * number, but a new snapshot is requsted to be written, then the oldest snapshot will be overwritten. + * @param client + * Cassandra client configuration, see [[CassandraConfig]] for more details. + * @param schema + * Schema of Cassandra database, i.e. keyspace, names of the tables etc. It also contains a flag if schema should be + * automatically created if not present, which is useful for integration testing purposes etc. + * @param consistencyConfig + * Consistency levels to use for read and for write statements to Cassandra. The main reason one may be interested to + * change it, is for integration tests with small number of Cassandra nodes. + */ +final case class SnapshotCassandraConfig( + retries: Int = 100, + numberOfSnapshots: Int = 10, + client: CassandraConfig = CassandraConfig( + name = "snapshot", + query = QueryConfig(consistency = ConsistencyLevel.LOCAL_QUORUM, fetchSize = 1000, defaultIdempotence = true) + ), + schema: SchemaConfig = SchemaConfig.default, + consistencyConfig: EventualCassandraConfig.ConsistencyConfig = EventualCassandraConfig.ConsistencyConfig.default +) + +object SnapshotCassandraConfig { + + implicit val configReaderEventualCassandraConfig: ConfigReader[SnapshotCassandraConfig] = deriveReader + + val default: SnapshotCassandraConfig = SnapshotCassandraConfig() + +} diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraTest.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraTest.scala index 50a6064e7..21aa4629c 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraTest.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SnapshotCassandraTest.scala @@ -14,10 +14,12 @@ class SnapshotCassandraTest extends AnyFunSuite { type SnaphsotWithPayload = SnapshotRecord[EventualPayloadAndType] type F[A] = StateT[Try, DatabaseState, A] + val numberOfSnapshots: Int = 10 + test("save and load") { val program = for { statements <- statements.pure[F] - store = SnapshotCassandra(statements) + store = SnapshotCassandra(statements, numberOfSnapshots) key = Key("topic", "id") _ <- store.save(key, record) _ <- DatabaseState.sync @@ -32,11 +34,14 @@ class SnapshotCassandraTest extends AnyFunSuite { val program = for { statements <- statements.pure[F] // both snapshotters see empty metadata, because it is not saved yet - store = SnapshotCassandra[F](statements.copy(selectMetadata = { _ => - // sync data after first call to simulate delayed update - // otherwise the `selectMetadata` call may be stuck in an infinite loop - DatabaseState.get.map(_.metadata) <* DatabaseState.sync - })) + store = SnapshotCassandra[F]( + statements.copy(selectMetadata = { _ => + // sync data after first call to simulate delayed update + // otherwise the `selectMetadata` call may be stuck in an infinite loop + DatabaseState.get.map(_.metadata) <* DatabaseState.sync + }), + numberOfSnapshots + ) key = Key("topic", "id") snapshot1 = record.snapshot.copy(seqNr = SeqNr.unsafe(1)) snapshot2 = record.snapshot.copy(seqNr = SeqNr.unsafe(2)) @@ -55,7 +60,7 @@ class SnapshotCassandraTest extends AnyFunSuite { test("save is idempotent") { val program = for { statements <- statements.pure[F] - store = SnapshotCassandra[F](statements) + store = SnapshotCassandra[F](statements, numberOfSnapshots) key = Key("topic", "id") // try to save twice _ <- store.save(key, record) @@ -73,7 +78,7 @@ class SnapshotCassandraTest extends AnyFunSuite { test("drop all") { val program = for { statements <- statements.pure[F] - store = SnapshotCassandra(statements) + store = SnapshotCassandra(statements, numberOfSnapshots) key = Key("topic", "id") snapshot1 = record.snapshot.copy(seqNr = SeqNr.unsafe(1)) snapshot2 = record.snapshot.copy(seqNr = SeqNr.unsafe(2)) @@ -93,7 +98,7 @@ class SnapshotCassandraTest extends AnyFunSuite { test("drop by seqNr") { val program = for { statements <- statements.pure[F] - store = SnapshotCassandra(statements) + store = SnapshotCassandra(statements, numberOfSnapshots) key = Key("topic", "id") snapshot1 = record.snapshot.copy(seqNr = SeqNr.unsafe(1)) snapshot2 = record.snapshot.copy(seqNr = SeqNr.unsafe(2)) diff --git a/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStore.scala b/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStore.scala index a70561d70..45591cbfc 100644 --- a/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStore.scala +++ b/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStore.scala @@ -10,6 +10,7 @@ import cats.effect.unsafe.{IORuntime, IORuntimeConfig} import cats.syntax.all._ import com.evolutiongaming.catshelper.CatsHelper._ import com.evolutiongaming.catshelper.{FromFuture, LogOf, ToFuture} +import com.evolutiongaming.kafka.journal.eventual.cassandra.SnapshotCassandraConfig import com.evolutiongaming.kafka.journal.util.CatsHelper._ import com.evolutiongaming.kafka.journal.util.PureConfigHelper._ import com.evolutiongaming.kafka.journal.{JsonCodec, LogOfFromAkka, Origin, Payload, SnapshotReadWrite} @@ -69,7 +70,7 @@ class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor => def adapterIO: Resource[IO, SnapshotStoreAdapter[IO]] = { for { snapshotSerializer <- serializer - config <- kafkaJournalConfig.toResource + config <- cassandraSnapshotStoreConfig.toResource snapshotReadWrite <- snapshotReadWrite(config).toResource adapter <- adapterIO(config, snapshotSerializer, snapshotReadWrite) } yield adapter @@ -80,13 +81,13 @@ class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor => snapshotReadWrite: SnapshotReadWrite[IO, A] ): Resource[IO, SnapshotStoreAdapter[IO]] = { for { - config <- kafkaJournalConfig.toResource + config <- cassandraSnapshotStoreConfig.toResource adapter <- adapterIO(config, snapshotSerializer, snapshotReadWrite) } yield adapter } def adapterIO[A]( - config: KafkaJournalConfig, + config: CassandraSnapshotStoreConfig, snapshotSerializer: SnapshotSerializer[IO, A], snapshotReadWrite: SnapshotReadWrite[IO, A] ): Resource[IO, SnapshotStoreAdapter[IO]] = { @@ -104,7 +105,7 @@ class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor => origin = origin, snapshotSerializer = snapshotSerializer, snapshotReadWrite = snapshotReadWrite, - config = config, + config = config.cassandra, cassandraClusterOf = cassandraClusterOf )(logOf = logOf) } yield adapter @@ -140,7 +141,7 @@ class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor => origin: Option[Origin], snapshotSerializer: SnapshotSerializer[IO, A], snapshotReadWrite: SnapshotReadWrite[IO, A], - config: KafkaJournalConfig, + config: SnapshotCassandraConfig, cassandraClusterOf: CassandraClusterOf[IO] )(implicit logOf: LogOf[IO]): Resource[IO, SnapshotStoreAdapter[IO]] = SnapshotStoreAdapter.of[IO, A]( @@ -165,16 +166,16 @@ class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor => .value } - def kafkaJournalConfig: IO[KafkaJournalConfig] = + def cassandraSnapshotStoreConfig: IO[CassandraSnapshotStoreConfig] = ConfigSource .fromConfig(config) - .load[KafkaJournalConfig] + .load[CassandraSnapshotStoreConfig] .liftTo[IO] def serializer: Resource[IO, SnapshotSerializer[IO, Payload]] = SnapshotSerializer.of[IO](system).toResource - def snapshotReadWrite(config: KafkaJournalConfig): IO[SnapshotReadWrite[IO, Payload]] = + def snapshotReadWrite(config: CassandraSnapshotStoreConfig): IO[SnapshotReadWrite[IO, Payload]] = for { jsonCodec <- jsonCodec(config) } yield { @@ -182,7 +183,7 @@ class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor => SnapshotReadWrite.of[IO, Payload] } - def jsonCodec(config: KafkaJournalConfig): IO[JsonCodec[IO]] = { + def jsonCodec(config: CassandraSnapshotStoreConfig): IO[JsonCodec[IO]] = { val codec: JsonCodec[IO] = config.jsonCodec match { case KafkaJournalConfig.JsonCodec.Default => JsonCodec.default case KafkaJournalConfig.JsonCodec.PlayJson => JsonCodec.playJson diff --git a/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStoreConfig.scala b/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStoreConfig.scala new file mode 100644 index 000000000..0c2ecd19e --- /dev/null +++ b/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStoreConfig.scala @@ -0,0 +1,56 @@ +package akka.persistence.kafka.journal + +import com.evolutiongaming.kafka.journal.eventual.cassandra.SnapshotCassandraConfig +import pureconfig.generic.semiauto.deriveReader +import pureconfig.{ConfigCursor, ConfigReader, ConfigSource} + +import scala.concurrent.duration._ + +/** Configuration for [[CassandraSnapshotStore]]. + * + * This case class specifies configuration that could be set using `application.conf` (see `reference.conf` for an + * example of such configuration). + * + * @param cassandra + * Cassandra-specific configuration used by a plugin. + * @param startTimeout + * The timeout to create a journal adapter. Starting a journal involves some effectful steps, such as creating + * Cassandra session, so, in case of infrastructure or configuration troubles, it could take a longer time. Creating + * the journal will fail with [[TimeoutException]] if it takes longer than `startTimeout`. + * @param stopTimeout + * This is meant to be a counterpart to `startTimeout`, allowing resource release to timeout with an error. This + * parameter is not used, for now, and `startTimeout` is used instead. + * @param jsonCodec + * JSON codec to use for (de)serialization of the events from [[scodec.bits.ByteVector]] to + * [[play.api.libs.json.JsValue]] and vice-versa. This parameter is only relevant if default [[CassandraSnapshotStore]] is + * used, i.e. it is not taken into account if Circe JSON or other custom serialization is used. + * + * @see + * [[KafkaJournal]] for more details. + */ +final case class CassandraSnapshotStoreConfig( + cassandra: SnapshotCassandraConfig = SnapshotCassandraConfig.default, + startTimeout: FiniteDuration = 1.minute, + stopTimeout: FiniteDuration = 1.minute, + jsonCodec: KafkaJournalConfig.JsonCodec = KafkaJournalConfig.JsonCodec.Default +) + +object CassandraSnapshotStoreConfig { + + val default: CassandraSnapshotStoreConfig = CassandraSnapshotStoreConfig() + + implicit val configReaderKafkaJournalConfig: ConfigReader[CassandraSnapshotStoreConfig] = { + + val configReader = deriveReader[CassandraSnapshotStoreConfig] + + cursor: ConfigCursor => { + for { + cursor <- cursor.asObjectCursor + config = cursor.objValue.toConfig + source = ConfigSource.fromConfig(config) + config <- source.load(configReader) + } yield config + } + } + +} diff --git a/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotStoreAdapter.scala b/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotStoreAdapter.scala index 58d3576b3..fdc8b9ff3 100644 --- a/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotStoreAdapter.scala +++ b/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotStoreAdapter.scala @@ -8,7 +8,7 @@ import com.evolutiongaming.catshelper.LogOf import com.evolutiongaming.kafka.journal import com.evolutiongaming.kafka.journal._ import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType -import com.evolutiongaming.kafka.journal.eventual.cassandra.SnapshotCassandra +import com.evolutiongaming.kafka.journal.eventual.cassandra.{SnapshotCassandra, SnapshotCassandraConfig} import com.evolutiongaming.kafka.journal.util.Fail import com.evolutiongaming.scassandra.CassandraClusterOf @@ -33,7 +33,7 @@ object SnapshotStoreAdapter { origin: Option[Origin], snapshotSerializer: SnapshotSerializer[F, A], snapshotReadWrite: SnapshotReadWrite[F, A], - config: KafkaJournalConfig, + config: SnapshotCassandraConfig, cassandraClusterOf: CassandraClusterOf[F] ): Resource[F, SnapshotStoreAdapter[F]] = { @@ -43,7 +43,7 @@ object SnapshotStoreAdapter { SnapshotStoreAdapter(store, toKey, origin) for { - store <- SnapshotCassandra.of(config.cassandra, origin, cassandraClusterOf) + store <- SnapshotCassandra.of(config, origin, cassandraClusterOf) } yield adapter(store)(snapshotSerializer, snapshotReadWrite) } diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf index f280f0736..0b2d80f0d 100644 --- a/tests/src/test/resources/application.conf +++ b/tests/src/test/resources/application.conf @@ -47,6 +47,10 @@ evolutiongaming.kafka-journal { } } + persistence.snapshot { + cassandra = ${evolutiongaming.kafka-journal.cassandra} + } + replicator { topic-discovery-interval = 1s diff --git a/tests/src/test/scala/akka/persistence/snapshot/SnapshotStorePerfSpec.scala b/tests/src/test/scala/akka/persistence/snapshot/SnapshotStorePerfSpec.scala index a7d9b48e4..ca2494cc2 100644 --- a/tests/src/test/scala/akka/persistence/snapshot/SnapshotStorePerfSpec.scala +++ b/tests/src/test/scala/akka/persistence/snapshot/SnapshotStorePerfSpec.scala @@ -28,7 +28,7 @@ object SnapshotStorePerfSpec { var snapshotSavingFinishedAt: Map[Long, Long] = Map.empty // we do not want incoming events to affect measurement of snapshot saving - // so we will be stashing them when snaphotting is happening + // so we will be stashing them when snapshotting is happening var savingSnapshot = false var loadingSnapshot = true