Skip to content

Commit

Permalink
Introduce "snapshot-cassandra" module.
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Jan 25, 2024
1 parent 1d0e920 commit 79daac1
Show file tree
Hide file tree
Showing 18 changed files with 366 additions and 21 deletions.
15 changes: 12 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ lazy val root = (project in file(".")
replicator,
cassandra,
`eventual-cassandra`,
`snapshot-cassandra`,
`journal-circe`,
`persistence-circe`))

Expand Down Expand Up @@ -107,14 +108,16 @@ lazy val journal = (project in file("journal")
lazy val snapshot = (project in file("snapshot")
settings (name := "kafka-journal-snapshot")
settings commonSettings
dependsOn (journal))
dependsOn (journal)
settings(libraryDependencies ++= Seq(scalatest % Test)))

lazy val persistence = (project in file("persistence")
settings (name := "kafka-journal-persistence")
settings commonSettings
dependsOn (
journal % "test->test;compile->compile",
`eventual-cassandra`)
`eventual-cassandra`,
`snapshot-cassandra`)
settings (libraryDependencies ++= Seq(
`akka-serialization`,
`cats-helper`,
Expand Down Expand Up @@ -174,7 +177,13 @@ lazy val cassandra = (project in file("cassandra")
lazy val `eventual-cassandra` = (project in file("eventual-cassandra")
settings (name := "kafka-journal-eventual-cassandra")
settings commonSettings
dependsOn (cassandra, journal % "test->test;compile->compile", snapshot % "test->test;compile->compile")
dependsOn (cassandra, journal % "test->test;compile->compile")
settings (libraryDependencies ++= Seq(scassandra)))

lazy val `snapshot-cassandra` = (project in file("snapshot-cassandra")
settings (name := "kafka-journal-snapshot-cassandra")
settings commonSettings
dependsOn (cassandra, snapshot % "test->test;compile->compile")
settings (libraryDependencies ++= Seq(scassandra)))

lazy val `journal-circe` = (project in file("circe/core")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ object CreateSchema {

val pointer2 = table(config.pointer2Table, a => Nel.of(Pointer2Statements.createTable(a)))

val snapshot = table(config.snapshotTable, a => Nel.of(SnapshotStatements.createTable(a)))

val setting = table(config.settingTable, a => Nel.of(SettingStatements.createTable(a)))

val schema = Schema(
Expand All @@ -54,12 +52,11 @@ object CreateSchema {
metaJournal = tableName(metaJournal),
pointer = tableName(pointer),
pointer2 = tableName(pointer2),
snapshot = tableName(snapshot),
setting = tableName(setting))

if (config.autoCreate) {
for {
result <- createTables(keyspace, Nel.of(journal, pointer, pointer2, snapshot, setting, metaJournal))
result <- createTables(keyspace, Nel.of(journal, pointer, pointer2, setting, metaJournal))
} yield {
(schema, result)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ final case class Schema(
metaJournal: TableName,
pointer: TableName,
pointer2: TableName,
snapshot: TableName,
setting: TableName)
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class CreateSchemaSpec extends AnyFunSuite with Matchers { self =>
metaJournal = TableName(keyspace = "journal", table = "metajournal"),
pointer = TableName(keyspace = "journal", table = "pointer"),
pointer2 = TableName(keyspace = "journal", table = "pointer2"),
snapshot = TableName(keyspace = "journal", table = "snapshot_buffer"),
setting = TableName(keyspace = "journal", table = "setting"))

val createTables: CreateTables[StateT] = new CreateTables[StateT] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class SetupSchemaSpec extends AnyFunSuite with Matchers {
metaJournal = TableName(keyspace = "journal", table = "metaJournal"),
pointer = TableName(keyspace = "journal", table = "pointer"),
pointer2 = TableName(keyspace = "journal", table = "pointer2"),
snapshot = TableName(keyspace = "journal", table = "snapshot_buffer"),
setting = TableName(keyspace = "journal", table = "setting"))

implicit val settings: Settings[StateT] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import cats.effect.unsafe.{IORuntime, IORuntimeConfig}
import cats.syntax.all._
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper.{FromFuture, LogOf, ToFuture}
import com.evolutiongaming.kafka.journal.eventual.cassandra.SnapshotCassandraConfig
import com.evolutiongaming.kafka.journal.snapshot.cassandra.SnapshotCassandraConfig
import com.evolutiongaming.kafka.journal.util.CatsHelper._
import com.evolutiongaming.kafka.journal.util.PureConfigHelper._
import com.evolutiongaming.kafka.journal.{JsonCodec, LogOfFromAkka, Origin, Payload, SnapshotReadWrite}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package akka.persistence.kafka.journal

import com.evolutiongaming.kafka.journal.eventual.cassandra.SnapshotCassandraConfig
import com.evolutiongaming.kafka.journal.snapshot.cassandra.SnapshotCassandraConfig
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigCursor, ConfigReader, ConfigSource}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.journal
import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType
import com.evolutiongaming.kafka.journal.eventual.cassandra.{SnapshotCassandra, SnapshotCassandraConfig}
import com.evolutiongaming.kafka.journal.snapshot.cassandra.{SnapshotCassandra, SnapshotCassandraConfig}
import com.evolutiongaming.kafka.journal.util.Fail
import com.evolutiongaming.scassandra.CassandraClusterOf

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.evolutiongaming.kafka.journal.snapshot.cassandra

import cats.Monad
import cats.data.{NonEmptyList => Nel}
import cats.effect.Concurrent
import cats.syntax.all._
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.journal.eventual.cassandra._
import com.evolutiongaming.scassandra.TableName

object CreateSnapshotSchema {

def apply[F[_] : Concurrent : CassandraCluster : CassandraSession : CassandraSync : LogOf](
config: SnapshotSchemaConfig
): F[(SnapshotSchema, MigrateSchema.Fresh)] = {

for {
createTables <- CreateTables.of[F]
createKeyspace = CreateKeyspace[F]
result <- apply[F](config, createKeyspace, createTables)
} yield result
}

def apply[F[_] : Monad](
config: SnapshotSchemaConfig,
createKeyspace: CreateKeyspace[F],
createTables: CreateTables[F]
): F[(SnapshotSchema, MigrateSchema.Fresh)] = {

def createTables1 = {
val keyspace = config.keyspace.name

def tableName(table: CreateTables.Table) = TableName(keyspace = keyspace, table = table.name)

def table(name: String, query: TableName => Nel[String]) = {
val tableName = TableName(keyspace = keyspace, table = name)
CreateTables.Table(name = name, queries = query(tableName))
}

val snapshot = table(config.snapshotTable, a => Nel.of(SnapshotStatements.createTable(a)))

val setting = table(config.settingTable, a => Nel.of(SettingStatements.createTable(a)))

val schema = SnapshotSchema(
snapshot = tableName(snapshot),
setting = tableName(setting))

if (config.autoCreate) {
for {
result <- createTables(keyspace, Nel.of(snapshot, setting))
} yield {
(schema, result)
}
} else {
(schema, false).pure[F]
}
}

for {
_ <- createKeyspace(config.keyspace)
result <- createTables1
} yield result
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.evolutiongaming.kafka.journal.snapshot.cassandra

import cats.Parallel
import cats.effect.kernel.Temporal
import cats.syntax.all._
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.journal.Origin
import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraCluster, CassandraConsistencyConfig, CassandraSession, CassandraSync, SettingsCassandra}

/** Creates a new schema */
object SetupSnapshotSchema {

def apply[F[_]: Temporal: Parallel: CassandraCluster: CassandraSession: LogOf](
config: SnapshotSchemaConfig,
origin: Option[Origin],
consistencyConfig: CassandraConsistencyConfig
): F[SnapshotSchema] = {

def createSchema(implicit cassandraSync: CassandraSync[F]) = CreateSnapshotSchema(config)

for {
cassandraSync <- CassandraSync.of[F](config.keyspace, config.locksTable, origin)
ab <- createSchema(cassandraSync)
(schema, fresh) = ab
_ <- SettingsCassandra.of[F](schema.setting, origin, consistencyConfig)
} yield schema
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.evolutiongaming.kafka.journal.eventual.cassandra
package com.evolutiongaming.kafka.journal.snapshot.cassandra

import cats.effect.kernel.{Async, Resource, Temporal}
import cats.effect.syntax.all._
Expand All @@ -7,6 +7,7 @@ import cats.{Monad, MonadThrow, Parallel}
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType
import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraCluster, CassandraConsistencyConfig, CassandraSession}
import com.evolutiongaming.scassandra.CassandraClusterOf

import java.time.Instant
Expand All @@ -31,13 +32,13 @@ object SnapshotCassandra {
}

def of[F[_]: Temporal: Parallel: CassandraCluster: CassandraSession: LogOf](
schemaConfig: SchemaConfig,
schemaConfig: SnapshotSchemaConfig,
origin: Option[Origin],
consistencyConfig: CassandraConsistencyConfig,
numberOfSnapshots: Int
): F[SnapshotStoreFlat[F]] =
for {
schema <- SetupSchema[F](schemaConfig, origin, consistencyConfig)
schema <- SetupSnapshotSchema[F](schemaConfig, origin, consistencyConfig)
statements <- Statements.of[F](schema, consistencyConfig)
} yield SnapshotCassandra(statements, numberOfSnapshots)

Expand Down Expand Up @@ -157,7 +158,7 @@ object SnapshotCassandra {
)

object Statements {
def of[F[_]: Monad: CassandraSession](schema: Schema, consistencyConfig: CassandraConsistencyConfig): F[Statements[F]] = {
def of[F[_]: Monad: CassandraSession](schema: SnapshotSchema, consistencyConfig: CassandraConsistencyConfig): F[Statements[F]] = {
for {
insertRecord <- SnapshotStatements.InsertRecord.of[F](schema.snapshot, consistencyConfig.write)
updateRecord <- SnapshotStatements.UpdateRecord.of[F](schema.snapshot, consistencyConfig.write)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.evolutiongaming.kafka.journal.eventual.cassandra
package com.evolutiongaming.kafka.journal.snapshot.cassandra

import com.datastax.driver.core.ConsistencyLevel
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraConsistencyConfig
import com.evolutiongaming.scassandra.{CassandraConfig, QueryConfig}
import pureconfig.ConfigReader
import pureconfig.generic.semiauto.deriveReader
Expand Down Expand Up @@ -32,7 +33,7 @@ final case class SnapshotCassandraConfig(
name = "snapshot",
query = QueryConfig(consistency = ConsistencyLevel.LOCAL_QUORUM, fetchSize = 1000, defaultIdempotence = true)
),
schema: SchemaConfig = SchemaConfig.default,
schema: SnapshotSchemaConfig = SnapshotSchemaConfig.default,
consistencyConfig: CassandraConsistencyConfig = CassandraConsistencyConfig.default
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.evolutiongaming.kafka.journal.snapshot.cassandra

import com.evolutiongaming.scassandra.TableName

final case class SnapshotSchema(
snapshot: TableName,
setting: TableName)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.evolutiongaming.kafka.journal.snapshot.cassandra

import com.evolutiongaming.kafka.journal.eventual.cassandra.KeyspaceConfig
import pureconfig.ConfigReader
import pureconfig.generic.semiauto.deriveReader

final case class SnapshotSchemaConfig(
keyspace: KeyspaceConfig = KeyspaceConfig.default,
snapshotTable: String = "snapshot_buffer",
settingTable: String = "setting",
locksTable: String = "locks",
autoCreate: Boolean = true
)

object SnapshotSchemaConfig {

val default: SnapshotSchemaConfig = SnapshotSchemaConfig()

implicit val configReaderSchemaConfig: ConfigReader[SnapshotSchemaConfig] = deriveReader

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.evolutiongaming.kafka.journal.eventual.cassandra
package com.evolutiongaming.kafka.journal.snapshot.cassandra

import cats.Monad
import cats.syntax.all._
import com.datastax.driver.core.Row
import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraHelper._
import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraConsistencyConfig, CassandraSession}
import com.evolutiongaming.scassandra.syntax._
import com.evolutiongaming.scassandra.{DecodeByName, EncodeByName, TableName}
import scodec.bits.ByteVector
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.evolutiongaming.kafka.journal.snapshot.cassandra

import cats.Id
import cats.data.{NonEmptyList => Nel}
import com.evolutiongaming.kafka.journal.eventual.cassandra.{CreateKeyspace, CreateTables, KeyspaceConfig}
import com.evolutiongaming.kafka.journal.snapshot.cassandra.{CreateSnapshotSchema, SnapshotSchemaConfig}
import com.evolutiongaming.scassandra.TableName
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

class CreateSchemaSpec extends AnyFunSuite with Matchers { self =>

test("create keyspace and tables") {
val config = SnapshotSchemaConfig.default
val createSchema = CreateSnapshotSchema[StateT](config, createKeyspace, createTables)
val initial = State.empty.copy(createTables = true)
val (state, (schema, fresh)) = createSchema.run(initial)
state shouldEqual initial.copy(actions = List(Action.CreateTables, Action.CreateKeyspace))
fresh shouldEqual true
schema shouldEqual self.schema
}

test("not create keyspace and tables") {
val config = SnapshotSchemaConfig.default.copy(autoCreate = false)
val createSchema = CreateSnapshotSchema[StateT](config, createKeyspace, createTables)
val initial = State.empty.copy(createTables = true)
val (state, (schema, fresh)) = createSchema.run(initial)
state shouldEqual initial.copy(actions = List(Action.CreateKeyspace))
fresh shouldEqual false
schema shouldEqual self.schema
}


private val schema = SnapshotSchema(
snapshot = TableName(keyspace = "journal", table = "snapshot_buffer"),
setting = TableName(keyspace = "journal", table = "setting"))

val createTables: CreateTables[StateT] = new CreateTables[StateT] {
def apply(keyspace: String, tables: Nel[CreateTables.Table]) = {
StateT { state =>
val state1 = state.add(Action.CreateTables)
(state1, state.createTables)
}
}
}

val createKeyspace: CreateKeyspace[StateT] = new CreateKeyspace[StateT] {
def apply(config: KeyspaceConfig) = {
StateT { state =>
val state1 = state.add(Action.CreateKeyspace)
(state1, ())
}
}
}


case class State(createTables: Boolean, actions: List[Action]) {

def add(action: Action): State = copy(actions = action :: actions)
}

object State {
val empty: State = State(createTables = false, actions = Nil)
}


type StateT[A] = cats.data.StateT[Id, State, A]

object StateT {
def apply[A](f: State => (State, A)): StateT[A] = cats.data.StateT[Id, State, A](f)
}


sealed trait Action extends Product

object Action {
case object CreateTables extends Action
case object CreateKeyspace extends Action
}
}
Loading

0 comments on commit 79daac1

Please sign in to comment.