Skip to content

Commit

Permalink
Disable LWTs by default.
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed May 8, 2024
1 parent eee311e commit 789fce0
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.evolutiongaming.kafka.journal.snapshot.cassandra
import cats.effect.kernel.{Async, Resource, Temporal}
import cats.effect.syntax.all._
import cats.syntax.all._
import cats.{Monad, MonadThrow, Parallel}
import cats.{MonadThrow, Parallel}
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.cassandra.CassandraConsistencyConfig
Expand All @@ -22,7 +22,7 @@ object SnapshotCassandra {
): Resource[F, SnapshotStore[F]] = {

def store(implicit cassandraCluster: CassandraCluster[F], cassandraSession: CassandraSession[F]) =
of(config.schema, origin, config.consistencyConfig, config.numberOfSnapshots)
of(config.schema, origin, config.consistencyConfig, config.numberOfSnapshots, config.useLWT)

for {
cassandraCluster <- CassandraCluster.of[F](config.client, cassandraClusterOf, config.retries)
Expand All @@ -36,11 +36,12 @@ object SnapshotCassandra {
schemaConfig: SnapshotSchemaConfig,
origin: Option[Origin],
consistencyConfig: CassandraConsistencyConfig,
numberOfSnapshots: Int
numberOfSnapshots: Int,
useLWT: Boolean
): F[SnapshotStore[F]] =
for {
schema <- SetupSnapshotSchema[F](schemaConfig, origin, consistencyConfig)
statements <- Statements.of[F](schema, consistencyConfig)
statements <- Statements.of[F](schema, consistencyConfig, useLWT)
} yield SnapshotCassandra(statements, numberOfSnapshots)

private sealed abstract class Main
Expand Down Expand Up @@ -159,13 +160,13 @@ object SnapshotCassandra {
)

object Statements {
def of[F[_]: Monad: CassandraSession](schema: SnapshotSchema, consistencyConfig: CassandraConsistencyConfig): F[Statements[F]] = {
def of[F[_]: MonadThrow: CassandraSession](schema: SnapshotSchema, consistencyConfig: CassandraConsistencyConfig, useLWT: Boolean): F[Statements[F]] = {
for {
insertRecord <- SnapshotStatements.InsertRecord.of[F](schema.snapshot, consistencyConfig.write)
updateRecord <- SnapshotStatements.UpdateRecord.of[F](schema.snapshot, consistencyConfig.write)
selectRecord <- SnapshotStatements.SelectRecord.of[F](schema.snapshot, consistencyConfig.read)
selectMetadata <- SnapshotStatements.SelectMetadata.of[F](schema.snapshot, consistencyConfig.read)
deleteRecords <- SnapshotStatements.Delete.of[F](schema.snapshot, consistencyConfig.write)
insertRecord <- SnapshotStatements.InsertRecord.of[F](schema.snapshot, consistencyConfig.write, useLWT)
updateRecord <- SnapshotStatements.UpdateRecord.of[F](schema.snapshot, consistencyConfig.write, useLWT)
selectRecord <- SnapshotStatements.SelectRecord.of[F](schema.snapshot, consistencyConfig.read, useLWT)
selectMetadata <- SnapshotStatements.SelectMetadata.of[F](schema.snapshot, consistencyConfig.read, useLWT)
deleteRecords <- SnapshotStatements.Delete.of[F](schema.snapshot, consistencyConfig.write, useLWT)
} yield Statements(insertRecord, updateRecord, selectRecord, selectMetadata, deleteRecords)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import pureconfig.generic.semiauto.deriveReader
/** Cassandra-specific configuration used by a plugin.
*
* Specifies long time storage configuration and Cassandra client parameters.
*
* Note: if `useLWT` is set to `true`, then `consistencyConfig.read` should be set to `ConsistencyLevel.SERIAL` or
* `ConsistencyLevel.LOCAL_SERIAL`. Otherwise, the plugin will throw an exception to prevent data corruption.
*
* @param retries
* Number of retries in [[com.evolutiongaming.scassandra.NextHostRetryPolicy]]. It will retry doing a request on the
Expand All @@ -25,6 +28,10 @@ import pureconfig.generic.semiauto.deriveReader
* @param consistencyConfig
* Consistency levels to use for read and for write statements to Cassandra. The main reason one may be interested to
* change it, is for integration tests with small number of Cassandra nodes.
* @param useLWT
* Use Cassandra LWTs to ensure the older snapshots of one writer do not overwrite the newer snapshots of another.
* It is recommended to set it to `false` and rely on external mechanism to ensure there is only a single writer
* (such as Akka Persistence).
*/
final case class SnapshotCassandraConfig(
retries: Int = 100,
Expand All @@ -34,7 +41,8 @@ final case class SnapshotCassandraConfig(
query = QueryConfig(consistency = ConsistencyLevel.LOCAL_QUORUM, fetchSize = 1000, defaultIdempotence = true)
),
schema: SnapshotSchemaConfig = SnapshotSchemaConfig.default,
consistencyConfig: CassandraConsistencyConfig = CassandraConsistencyConfig.default
consistencyConfig: CassandraConsistencyConfig = CassandraConsistencyConfig.default,
useLWT: Boolean = false
)

object SnapshotCassandraConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.evolutiongaming.kafka.journal.snapshot.cassandra

import cats.Monad
import cats.syntax.all._
import cats.{Monad, MonadThrow}
import com.datastax.driver.core.Row
import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.cassandra.CassandraConsistencyConfig
Expand Down Expand Up @@ -42,7 +42,8 @@ object SnapshotStatements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: CassandraConsistencyConfig.Write,
useLWT: Boolean
): F[InsertRecord[F]] = {

implicit val encodeByNameByteVector: EncodeByName[ByteVector] =
Expand All @@ -63,7 +64,7 @@ object SnapshotStatements {
|payload_bin,
|metadata)
|VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|IF NOT EXISTS
|${if (useLWT) "IF NOT EXISTS" else ""}
|""".stripMargin

for {
Expand All @@ -90,7 +91,12 @@ object SnapshotStatements {

val statement = statementOf(snapshot)
val row = statement.first
row.map(_.fold(false)(_.wasApplied))

if (useLWT) {
row.map(_.fold(false)(_.wasApplied))
} else {
row.as(true)
}
}
}
}
Expand All @@ -108,7 +114,8 @@ object SnapshotStatements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: CassandraConsistencyConfig.Write,
useLWT: Boolean
): F[UpdateRecord[F]] = {

implicit val encodeByNameByteVector: EncodeByName[ByteVector] =
Expand All @@ -128,7 +135,7 @@ object SnapshotStatements {
|WHERE id = :id
|AND topic = :topic
|AND buffer_idx = :buffer_idx
|IF seq_nr = :delete_seq_nr
|${if (useLWT) "IF seq_nr = :delete_seq_nr" else ""}
|""".stripMargin

for {
Expand All @@ -144,7 +151,7 @@ object SnapshotStatements {
.encode(key)
.encode(bufferNr)
.encode("insert_seq_nr", snapshot.seqNr)
.encode("delete_seq_nr", deleteSnapshot)
.encodeSome("delete_seq_nr", Option.when(useLWT)(deleteSnapshot))
.encode("timestamp", record.timestamp)
.encodeSome(record.origin)
.encodeSome(record.version)
Expand All @@ -156,7 +163,12 @@ object SnapshotStatements {

val statement = statementOf(insertSnapshot)
val row = statement.first
row.map(_.fold(false)(_.wasApplied))

if (useLWT) {
row.map(_.fold(false)(_.wasApplied))
} else {
row.as(true)
}
}
}
}
Expand All @@ -167,9 +179,10 @@ object SnapshotStatements {

object SelectMetadata {

def of[F[_]: Monad: CassandraSession](
def of[F[_]: MonadThrow: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: CassandraConsistencyConfig.Read,
useLWT: Boolean
): F[SelectMetadata[F]] = {

val query =
Expand All @@ -183,6 +196,9 @@ object SnapshotStatements {
|""".stripMargin

for {
_ <- MonadThrow[F].raiseWhen(useLWT && !consistencyConfig.value.isSerial) {
new IllegalArgumentException("consistencyConfig should be set to SERIAL or LOCAL_SERIAL when useLWT = true")
}
prepared <- query.prepare
} yield { key =>
val bound = prepared
Expand Down Expand Up @@ -212,9 +228,10 @@ object SnapshotStatements {

object SelectRecord {

def of[F[_]: Monad: CassandraSession](
def of[F[_]: MonadThrow: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: CassandraConsistencyConfig.Read,
useLWT: Boolean
): F[SelectRecord[F]] = {

implicit val decodeByNameByteVector: DecodeByName[ByteVector] =
Expand All @@ -237,6 +254,9 @@ object SnapshotStatements {
|""".stripMargin

for {
_ <- MonadThrow[F].raiseWhen(useLWT && !consistencyConfig.value.isSerial) {
new IllegalArgumentException("consistencyConfig should be set to SERIAL or LOCAL_SERIAL when useLWT = true")
}
prepared <- query.prepare
} yield { (key, bufferNr) =>
def readPayload(row: Row): EventualPayloadAndType = {
Expand Down Expand Up @@ -282,15 +302,20 @@ object SnapshotStatements {

object Delete {

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Write): F[Delete[F]] = {
def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write,
useLWT: Boolean
): F[Delete[F]] = {

val query =
s"""
|DELETE FROM ${name.toCql}
|WHERE id = ?
|AND topic = ?
|AND buffer_idx = ?
|IF EXISTS""".stripMargin
|${if (useLWT) "IF EXISTS" else ""}
|""".stripMargin

for {
prepared <- query.prepare
Expand All @@ -301,7 +326,12 @@ object SnapshotStatements {
.encode(bufferNr)
.setConsistencyLevel(consistencyConfig.value)
.first
row.map(_.fold(false)(_.wasApplied))

if (useLWT) {
row.map(_.fold(false)(_.wasApplied))
} else {
row.as(true)
}
}
}
}
Expand Down

0 comments on commit 789fce0

Please sign in to comment.