Skip to content

Commit

Permalink
Separate CassandraSnapshotStoreConfig from KafkaJournalConfig.
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Dec 5, 2023
1 parent a7cc3e0 commit cdd7458
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@ import java.time.Instant

object SnapshotCassandra {

// TODO: make it configurable
val BufferSize = 10

def of[F[_]: Async: Parallel: LogOf](
config: EventualCassandraConfig,
config: SnapshotCassandraConfig,
origin: Option[Origin],
cassandraClusterOf: CassandraClusterOf[F]
): Resource[F, SnapshotStoreFlat[F]] = {

def store(implicit cassandraCluster: CassandraCluster[F], cassandraSession: CassandraSession[F]) = {
of(config.schema, origin, config.consistencyConfig)
}
def store(implicit cassandraCluster: CassandraCluster[F], cassandraSession: CassandraSession[F]) =
of(config.schema, origin, config.consistencyConfig, config.numberOfSnapshots)

for {
cassandraCluster <- CassandraCluster.of[F](config.client, cassandraClusterOf, config.retries)
Expand All @@ -38,16 +34,17 @@ object SnapshotCassandra {
def of[F[_]: Temporal: Parallel: CassandraCluster: CassandraSession: LogOf](
schemaConfig: SchemaConfig,
origin: Option[Origin],
consistencyConfig: ConsistencyConfig
consistencyConfig: ConsistencyConfig,
numberOfSnapshots: Int
): F[SnapshotStoreFlat[F]] =
for {
schema <- SetupSchema[F](schemaConfig, origin, consistencyConfig)
statements <- Statements.of[F](schema, consistencyConfig)
} yield SnapshotCassandra(statements)
} yield SnapshotCassandra(statements, numberOfSnapshots)

private sealed abstract class Main

def apply[F[_]: MonadThrow](statements: Statements[F]): SnapshotStoreFlat[F] = {
def apply[F[_]: MonadThrow](statements: Statements[F], numberOfSnapshots: Int): SnapshotStoreFlat[F] = {
new Main with SnapshotStoreFlat[F] {

def save(key: Key, snapshot: SnapshotRecord[EventualPayloadAndType]): F[Unit] = {
Expand All @@ -56,7 +53,7 @@ object SnapshotCassandra {
case s if s.values.exists { case (seqNr, _) => snapshot.snapshot.seqNr == seqNr } =>
update(key, s, snapshot)
// there is a free place to add a snapshot
case s if s.size < BufferSize => insert(key, s, snapshot)
case s if s.size < numberOfSnapshots => insert(key, s, snapshot)
// all rows are taken, we have to update one of them
case s => replace(key, s, snapshot)
}
Expand All @@ -67,7 +64,7 @@ object SnapshotCassandra {
savedSnapshots: Map[BufferNr, (SeqNr, Instant)],
snapshot: SnapshotRecord[EventualPayloadAndType]
): F[Unit] = {
val allBufferNrs = BufferNr.listOf(BufferSize)
val allBufferNrs = BufferNr.listOf(numberOfSnapshots)
val takenBufferNrs = savedSnapshots.keySet
val freeBufferNr = allBufferNrs.find(bufferNr => !takenBufferNrs.contains(bufferNr))
MonadThrow[F].fromOption(freeBufferNr, SnapshotStoreError("Could not find a free key")).flatMap { bufferNr =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.evolutiongaming.kafka.journal.eventual.cassandra

import com.datastax.driver.core.ConsistencyLevel
import com.evolutiongaming.scassandra.{CassandraConfig, QueryConfig}
import pureconfig.ConfigReader
import pureconfig.generic.semiauto.deriveReader

/** Cassandra-specific configuration used by a plugin.
*
* Specifies long time storage configuration and Cassandra client parameters.
*
* @param retries
* Number of retries in [[com.evolutiongaming.scassandra.NextHostRetryPolicy]]. It will retry doing a request on the
* same host if it timed out, or switch to another host if error happened, or the host was not available on a first
* attempt.
* @param numberOfSnapshots
* Maximum number of snapshots to be stored per single persistence id. If the number of snapshots reaches this
* number, but a new snapshot is requsted to be written, then the oldest snapshot will be overwritten.
* @param client
* Cassandra client configuration, see [[CassandraConfig]] for more details.
* @param schema
* Schema of Cassandra database, i.e. keyspace, names of the tables etc. It also contains a flag if schema should be
* automatically created if not present, which is useful for integration testing purposes etc.
* @param consistencyConfig
* Consistency levels to use for read and for write statements to Cassandra. The main reason one may be interested to
* change it, is for integration tests with small number of Cassandra nodes.
*/
final case class SnapshotCassandraConfig(
retries: Int = 100,
numberOfSnapshots: Int = 10,
client: CassandraConfig = CassandraConfig(
name = "snapshot",
query = QueryConfig(consistency = ConsistencyLevel.LOCAL_QUORUM, fetchSize = 1000, defaultIdempotence = true)
),
schema: SchemaConfig = SchemaConfig.default,
consistencyConfig: EventualCassandraConfig.ConsistencyConfig = EventualCassandraConfig.ConsistencyConfig.default
)

object SnapshotCassandraConfig {

implicit val configReaderEventualCassandraConfig: ConfigReader[SnapshotCassandraConfig] = deriveReader

val default: SnapshotCassandraConfig = SnapshotCassandraConfig()

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ class SnapshotCassandraTest extends AnyFunSuite {
type SnaphsotWithPayload = SnapshotRecord[EventualPayloadAndType]
type F[A] = StateT[Try, DatabaseState, A]

val numberOfSnapshots: Int = 10

test("save and load") {
val program = for {
statements <- statements.pure[F]
store = SnapshotCassandra(statements)
store = SnapshotCassandra(statements, numberOfSnapshots)
key = Key("topic", "id")
_ <- store.save(key, record)
_ <- DatabaseState.sync
Expand All @@ -32,11 +34,14 @@ class SnapshotCassandraTest extends AnyFunSuite {
val program = for {
statements <- statements.pure[F]
// both snapshotters see empty metadata, because it is not saved yet
store = SnapshotCassandra[F](statements.copy(selectMetadata = { _ =>
// sync data after first call to simulate delayed update
// otherwise the `selectMetadata` call may be stuck in an infinite loop
DatabaseState.get.map(_.metadata) <* DatabaseState.sync
}))
store = SnapshotCassandra[F](
statements.copy(selectMetadata = { _ =>
// sync data after first call to simulate delayed update
// otherwise the `selectMetadata` call may be stuck in an infinite loop
DatabaseState.get.map(_.metadata) <* DatabaseState.sync
}),
numberOfSnapshots
)
key = Key("topic", "id")
snapshot1 = record.snapshot.copy(seqNr = SeqNr.unsafe(1))
snapshot2 = record.snapshot.copy(seqNr = SeqNr.unsafe(2))
Expand All @@ -55,7 +60,7 @@ class SnapshotCassandraTest extends AnyFunSuite {
test("save is idempotent") {
val program = for {
statements <- statements.pure[F]
store = SnapshotCassandra[F](statements)
store = SnapshotCassandra[F](statements, numberOfSnapshots)
key = Key("topic", "id")
// try to save twice
_ <- store.save(key, record)
Expand All @@ -73,7 +78,7 @@ class SnapshotCassandraTest extends AnyFunSuite {
test("drop all") {
val program = for {
statements <- statements.pure[F]
store = SnapshotCassandra(statements)
store = SnapshotCassandra(statements, numberOfSnapshots)
key = Key("topic", "id")
snapshot1 = record.snapshot.copy(seqNr = SeqNr.unsafe(1))
snapshot2 = record.snapshot.copy(seqNr = SeqNr.unsafe(2))
Expand All @@ -93,7 +98,7 @@ class SnapshotCassandraTest extends AnyFunSuite {
test("drop by seqNr") {
val program = for {
statements <- statements.pure[F]
store = SnapshotCassandra(statements)
store = SnapshotCassandra(statements, numberOfSnapshots)
key = Key("topic", "id")
snapshot1 = record.snapshot.copy(seqNr = SeqNr.unsafe(1))
snapshot2 = record.snapshot.copy(seqNr = SeqNr.unsafe(2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import cats.effect.unsafe.{IORuntime, IORuntimeConfig}
import cats.syntax.all._
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper.{FromFuture, LogOf, ToFuture}
import com.evolutiongaming.kafka.journal.eventual.cassandra.SnapshotCassandraConfig
import com.evolutiongaming.kafka.journal.util.CatsHelper._
import com.evolutiongaming.kafka.journal.util.PureConfigHelper._
import com.evolutiongaming.kafka.journal.{JsonCodec, LogOfFromAkka, Origin, Payload, SnapshotReadWrite}
Expand Down Expand Up @@ -69,7 +70,7 @@ class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor =>
def adapterIO: Resource[IO, SnapshotStoreAdapter[IO]] = {
for {
snapshotSerializer <- serializer
config <- kafkaJournalConfig.toResource
config <- cassandraSnapshotStoreConfig.toResource
snapshotReadWrite <- snapshotReadWrite(config).toResource
adapter <- adapterIO(config, snapshotSerializer, snapshotReadWrite)
} yield adapter
Expand All @@ -80,13 +81,13 @@ class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor =>
snapshotReadWrite: SnapshotReadWrite[IO, A]
): Resource[IO, SnapshotStoreAdapter[IO]] = {
for {
config <- kafkaJournalConfig.toResource
config <- cassandraSnapshotStoreConfig.toResource
adapter <- adapterIO(config, snapshotSerializer, snapshotReadWrite)
} yield adapter
}

def adapterIO[A](
config: KafkaJournalConfig,
config: CassandraSnapshotStoreConfig,
snapshotSerializer: SnapshotSerializer[IO, A],
snapshotReadWrite: SnapshotReadWrite[IO, A]
): Resource[IO, SnapshotStoreAdapter[IO]] = {
Expand All @@ -104,7 +105,7 @@ class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor =>
origin = origin,
snapshotSerializer = snapshotSerializer,
snapshotReadWrite = snapshotReadWrite,
config = config,
config = config.cassandra,
cassandraClusterOf = cassandraClusterOf
)(logOf = logOf)
} yield adapter
Expand Down Expand Up @@ -140,7 +141,7 @@ class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor =>
origin: Option[Origin],
snapshotSerializer: SnapshotSerializer[IO, A],
snapshotReadWrite: SnapshotReadWrite[IO, A],
config: KafkaJournalConfig,
config: SnapshotCassandraConfig,
cassandraClusterOf: CassandraClusterOf[IO]
)(implicit logOf: LogOf[IO]): Resource[IO, SnapshotStoreAdapter[IO]] =
SnapshotStoreAdapter.of[IO, A](
Expand All @@ -165,24 +166,24 @@ class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor =>
.value
}

def kafkaJournalConfig: IO[KafkaJournalConfig] =
def cassandraSnapshotStoreConfig: IO[CassandraSnapshotStoreConfig] =
ConfigSource
.fromConfig(config)
.load[KafkaJournalConfig]
.load[CassandraSnapshotStoreConfig]
.liftTo[IO]

def serializer: Resource[IO, SnapshotSerializer[IO, Payload]] =
SnapshotSerializer.of[IO](system).toResource

def snapshotReadWrite(config: KafkaJournalConfig): IO[SnapshotReadWrite[IO, Payload]] =
def snapshotReadWrite(config: CassandraSnapshotStoreConfig): IO[SnapshotReadWrite[IO, Payload]] =
for {
jsonCodec <- jsonCodec(config)
} yield {
implicit val jsonCodec1 = jsonCodec
SnapshotReadWrite.of[IO, Payload]
}

def jsonCodec(config: KafkaJournalConfig): IO[JsonCodec[IO]] = {
def jsonCodec(config: CassandraSnapshotStoreConfig): IO[JsonCodec[IO]] = {
val codec: JsonCodec[IO] = config.jsonCodec match {
case KafkaJournalConfig.JsonCodec.Default => JsonCodec.default
case KafkaJournalConfig.JsonCodec.PlayJson => JsonCodec.playJson
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package akka.persistence.kafka.journal

import com.evolutiongaming.kafka.journal.eventual.cassandra.SnapshotCassandraConfig
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigCursor, ConfigReader, ConfigSource}

import scala.concurrent.duration._

/** Configuration for [[CassandraSnapshotStore]].
*
* This case class specifies configuration that could be set using `application.conf` (see `reference.conf` for an
* example of such configuration).
*
* @param cassandra
* Cassandra-specific configuration used by a plugin.
* @param startTimeout
* The timeout to create a journal adapter. Starting a journal involves some effectful steps, such as creating
* Cassandra session, so, in case of infrastructure or configuration troubles, it could take a longer time. Creating
* the journal will fail with [[TimeoutException]] if it takes longer than `startTimeout`.
* @param stopTimeout
* This is meant to be a counterpart to `startTimeout`, allowing resource release to timeout with an error. This
* parameter is not used, for now, and `startTimeout` is used instead.
* @param jsonCodec
* JSON codec to use for (de)serialization of the events from [[scodec.bits.ByteVector]] to
* [[play.api.libs.json.JsValue]] and vice-versa. This parameter is only relevant if default [[CassandraSnapshotStore]] is
* used, i.e. it is not taken into account if Circe JSON or other custom serialization is used.
*
* @see
* [[KafkaJournal]] for more details.
*/
final case class CassandraSnapshotStoreConfig(
cassandra: SnapshotCassandraConfig = SnapshotCassandraConfig.default,
startTimeout: FiniteDuration = 1.minute,
stopTimeout: FiniteDuration = 1.minute,
jsonCodec: KafkaJournalConfig.JsonCodec = KafkaJournalConfig.JsonCodec.Default
)

object CassandraSnapshotStoreConfig {

val default: CassandraSnapshotStoreConfig = CassandraSnapshotStoreConfig()

implicit val configReaderKafkaJournalConfig: ConfigReader[CassandraSnapshotStoreConfig] = {

val configReader = deriveReader[CassandraSnapshotStoreConfig]

cursor: ConfigCursor => {
for {
cursor <- cursor.asObjectCursor
config = cursor.objValue.toConfig
source = ConfigSource.fromConfig(config)
config <- source.load(configReader)
} yield config
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.journal
import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType
import com.evolutiongaming.kafka.journal.eventual.cassandra.SnapshotCassandra
import com.evolutiongaming.kafka.journal.eventual.cassandra.{SnapshotCassandra, SnapshotCassandraConfig}
import com.evolutiongaming.kafka.journal.util.Fail
import com.evolutiongaming.scassandra.CassandraClusterOf

Expand All @@ -33,7 +33,7 @@ object SnapshotStoreAdapter {
origin: Option[Origin],
snapshotSerializer: SnapshotSerializer[F, A],
snapshotReadWrite: SnapshotReadWrite[F, A],
config: KafkaJournalConfig,
config: SnapshotCassandraConfig,
cassandraClusterOf: CassandraClusterOf[F]
): Resource[F, SnapshotStoreAdapter[F]] = {

Expand All @@ -43,7 +43,7 @@ object SnapshotStoreAdapter {
SnapshotStoreAdapter(store, toKey, origin)

for {
store <- SnapshotCassandra.of(config.cassandra, origin, cassandraClusterOf)
store <- SnapshotCassandra.of(config, origin, cassandraClusterOf)
} yield adapter(store)(snapshotSerializer, snapshotReadWrite)
}

Expand Down
4 changes: 4 additions & 0 deletions tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ evolutiongaming.kafka-journal {
}
}

persistence.snapshot {
cassandra = ${evolutiongaming.kafka-journal.cassandra}
}

replicator {
topic-discovery-interval = 1s

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object SnapshotStorePerfSpec {
var snapshotSavingFinishedAt: Map[Long, Long] = Map.empty

// we do not want incoming events to affect measurement of snapshot saving
// so we will be stashing them when snaphotting is happening
// so we will be stashing them when snapshotting is happening
var savingSnapshot = false

var loadingSnapshot = true
Expand Down

0 comments on commit cdd7458

Please sign in to comment.