Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra snapshot store #532

Draft
wants to merge 65 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
14176a4
WIP: Snapshot statements.
rtar Apr 17, 2024
c14de14
Remove headers and tags.
rtar Feb 13, 2024
79330f2
Delete record instead of changing the status.
rtar Feb 13, 2024
e086b15
Add snapshots into schema creation.
rtar Feb 2, 2024
8b2e1a2
Experiment with API
rtar Feb 2, 2024
f34c307
Placeholder for a store implementation.
rtar Nov 7, 2023
7e2f415
Insert funnciton (without retries)
rtar Feb 9, 2024
5874e6e
Add unsafe update function.
rtar Nov 8, 2023
f2d2be5
Add unit test for save and load.
rtar Nov 8, 2023
2906d24
Rename test to adhere to naming convention.
rtar Nov 8, 2023
495203f
Some working tests are added.
rtar Nov 8, 2023
a0723cd
Reflect potential concurrency issue in the test.
rtar Nov 9, 2023
587bbc6
Add naive drop implementation.
rtar Nov 9, 2023
0b71740
Add test for drop methods.
rtar Nov 10, 2023
a909f1d
Start working on snapshot adapter.
rtar Feb 15, 2024
4f12c01
Save should, probably, be idempotent.
rtar Nov 13, 2023
a9191ea
Avoid snapshot overwrite with IF/IF NOT EXISTS condition.
rtar Feb 1, 2024
bcb6a69
A bit more modern code style.
rtar Nov 14, 2023
c29df98
Trivial implementation of #save method.
rtar Nov 14, 2023
bb29464
Naive implementation of snapshot store adapter.
rtar Nov 15, 2023
5fa8754
Add snapshot constructor.
rtar Nov 15, 2023
855b545
Add constructor for SnapshotStoreAdapter.
rtar Nov 15, 2023
3304898
Add adapter factories.
rtar Nov 16, 2023
e5da5af
Add adapter instance.
rtar Nov 16, 2023
915f640
Implement snapshot store API.
rtar Nov 16, 2023
b73e848
Add integration test.
rtar Nov 16, 2023
63ecca5
Minimal implementation (TCK not passing yet)
rtar Nov 21, 2023
2518787
Update the snapshot with the same SeqNr according to TCK
rtar Nov 21, 2023
a878be4
Snapshot TCK is passing now.
rtar Nov 21, 2023
edc45a1
Enable running Cassandra before test runs.
rtar Nov 21, 2023
87d940d
Fix ordering issue found by performance tests.
rtar Nov 28, 2023
816c919
Snapshot store performance test.
rtar Nov 28, 2023
a282309
Add a performance test for Akka Persistence Cassandra to compare.
rtar Nov 30, 2023
4f1838d
Reduce number of configuration files.
rtar Dec 1, 2023
b35cd84
Use LWT when deleting a row.
rtar Dec 1, 2023
837a54c
Revert "Use LWT when deleting a row."
rtar Dec 1, 2023
5fdc2e9
Revert "Revert "Use LWT when deleting a row.""
rtar Dec 1, 2023
590b371
Fix broken test.
rtar Dec 1, 2023
d9a4753
Address review comments.
rtar Dec 1, 2023
6ebea73
Use a new table name to avoid conflict with Akka Persistence Cassandra.
rtar Dec 4, 2023
965f2e7
Remove segmentNr, because each set of snapshots per key is expected to
rtar Dec 4, 2023
f10b5cc
Fixed broken test.
rtar Dec 4, 2023
55a0493
Remove question mark for segmentNr.
rtar Dec 4, 2023
c0b4f88
Use correct table name.
rtar Feb 2, 2024
d5a946b
Use correct column index for bufferNr
rtar Dec 4, 2023
9a6d994
Separate CassandraSnapshotStoreConfig from KafkaJournalConfig.
rtar Dec 4, 2023
5e05686
Fix compilation issues after rebase
rtar Dec 18, 2023
edd6aad
Introduce "snapshot-cassandra" module.
rtar Apr 17, 2024
db90f9e
Remove unnecessary changes.
rtar Jan 25, 2024
e9babc9
Minimize number of changes.
rtar Jan 25, 2024
37ecc8c
Fixed file names.
rtar Feb 1, 2024
74345c6
Simplified method signatures.
rtar Feb 1, 2024
26fe488
Removed unnecessary copy-paste.
rtar Feb 1, 2024
8d2e676
Fixed issues caused by merge conflicts.
rtar Feb 2, 2024
d76d18e
Simplify CreateSnapshotSchema.
rtar Feb 2, 2024
9cce3d5
Remove obsolete test.
rtar Feb 2, 2024
4eb4bf8
Minimize number of changes.
rtar Feb 5, 2024
d9089a8
Remove option wrapper around a payload.
rtar Feb 5, 2024
a99b355
Add scaladoc for SnapshotStoreFlat.
rtar Feb 14, 2024
65697d9
Replace `drop` with `delete` to adhere to Akka naming convention.
rtar Feb 14, 2024
17807a6
Rename SnapshotStoreFlat to SnapshotStore.
rtar Feb 15, 2024
8fecd15
Add scaladoc for SnapshotSerializer.
rtar Feb 15, 2024
1a425e7
Fix compliation issues after rebase.
rtar Apr 17, 2024
adb39a9
Disable LWTs by default.
rtar May 8, 2024
c04b508
Print out fractional part of the duration.
rtar May 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,16 @@ lazy val `tests` = (project in file("tests")
Slf4j.`log4j-over-slf4j` % Test,
Logback.core % Test,
Logback.classic % Test,
scalatest % Test)))
scalatest % Test,
// these libraries are only need for
// Akka Persistence Cassandra Persistence performance tests
Akka.`persistence-cassandra` % Test,
Akka.`akka-cluster` % Test,
Akka.`akka-cluster-tools` % Test,
Akka.`akka-coordination` % Test,
Akka.`akka-persistence-query` % Test,
Akka.`akka-pki` % Test,
Akka.`akka-remote` % Test)))

lazy val replicator = (Project("replicator", file("replicator"))
settings (name := "kafka-journal-replicator")
Expand Down
35 changes: 34 additions & 1 deletion persistence/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,39 @@ evolutiongaming.kafka-journal.persistence {
json-codec = default
}

snapshot {

class = "akka.persistence.kafka.journal.CassandraSnapshotStore"

plugin-dispatcher = "evolutiongaming.kafka-journal.persistence.dispatcher"

persistence-id-to-key {
impl = "constant-topic" # valid values: constant-topic, split
constant-topic {
topic = "journal"
}
}

cassandra {
client {
name = "journal"
query {
fetch-size = 1000
consistency = "LOCAL_QUORUM"
default-idempotence = true
}
}
}

circuit-breaker {
max-failures = 100
call-timeout = 130s // should be higher than producer.delivery-timeout
reset-timeout = 1m
}

json-codec = default
}

dispatcher {
type = "Dispatcher"
executor = "fork-join-executor"
Expand All @@ -64,4 +97,4 @@ evolutiongaming.kafka-journal.persistence {
parallelism-max = 32
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package akka.persistence.kafka.journal

import akka.actor.ActorSystem
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.{SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria}
import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.syntax.all._
import cats.effect.unsafe.{IORuntime, IORuntimeConfig}
import cats.syntax.all._
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper.{FromFuture, LogOf, ToFuture}
import com.evolutiongaming.kafka.journal.snapshot.cassandra.SnapshotCassandraConfig
import com.evolutiongaming.kafka.journal.util.CatsHelper._
import com.evolutiongaming.kafka.journal.util.PureConfigHelper._
import com.evolutiongaming.kafka.journal.{JsonCodec, LogOfFromAkka, Origin, Payload, SnapshotReadWrite}
import com.evolutiongaming.retry.Retry.implicits._
import com.evolutiongaming.retry.{OnError, Strategy}
import com.evolutiongaming.scassandra.CassandraClusterOf
import com.typesafe.config.Config
import pureconfig.ConfigSource

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor, Future}

class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor =>

implicit val system: ActorSystem = context.system
implicit val executor: ExecutionContextExecutor = context.dispatcher

private val (blocking, blockingShutdown) = IORuntime.createDefaultBlockingExecutionContext("kafka-journal-blocking")
private val (scheduler, schedulerShutdown) = IORuntime.createDefaultScheduler("kafka-journal-scheduler")
implicit val ioRuntime: IORuntime = IORuntime(
compute = executor,
blocking = blocking,
scheduler = scheduler,
shutdown = () => {
blockingShutdown()
schedulerShutdown()
},
config = IORuntimeConfig()
)
implicit val toFuture: ToFuture[IO] = ToFuture.ioToFuture
implicit val fromFuture: FromFuture[IO] = FromFuture.lift[IO]

val adapter: Future[(SnapshotStoreAdapter[Future], IO[Unit])] =
adapterIO
.map { _.mapK(toFuture.toFunctionK, fromFuture.toFunctionK) }
.allocated
.toFuture

override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
adapter.flatMap { case (adapter, _) => adapter.load(persistenceId, criteria) }

override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
adapter.flatMap { case (adapter, _) => adapter.save(metadata, snapshot) }

override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] =
adapter.flatMap { case (adapter, _) => adapter.delete(metadata) }

override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] =
adapter.flatMap { case (adapter, _) => adapter.delete(persistenceId, criteria) }

override def postStop(): Unit = {
val future = adapter.flatMap { case (_, release) => release.toFuture }
Await.result(future, 1.minute)
super.postStop()
}

def adapterIO: Resource[IO, SnapshotStoreAdapter[IO]] = {
for {
snapshotSerializer <- serializer
config <- cassandraSnapshotStoreConfig.toResource
snapshotReadWrite <- snapshotReadWrite(config).toResource
adapter <- adapterIO(config, snapshotSerializer, snapshotReadWrite)
} yield adapter
}

def adapterIO[A](
snapshotSerializer: SnapshotSerializer[IO, A],
snapshotReadWrite: SnapshotReadWrite[IO, A]
): Resource[IO, SnapshotStoreAdapter[IO]] = {
for {
config <- cassandraSnapshotStoreConfig.toResource
adapter <- adapterIO(config, snapshotSerializer, snapshotReadWrite)
} yield adapter
}

def adapterIO[A](
config: CassandraSnapshotStoreConfig,
snapshotSerializer: SnapshotSerializer[IO, A],
snapshotReadWrite: SnapshotReadWrite[IO, A]
): Resource[IO, SnapshotStoreAdapter[IO]] = {
for {
logOf <- logOf
log <- logOf(classOf[KafkaJournal]).toResource
_ <- log.debug(s"config: $config").toResource
adapter <- Resource {
val adapter = for {
toKey <- toKey
origin <- origin.toResource
cassandraClusterOf <- cassandraClusterOf
adapter <- adapterOf(
toKey = toKey,
origin = origin,
snapshotSerializer = snapshotSerializer,
snapshotReadWrite = snapshotReadWrite,
config = config.cassandra,
cassandraClusterOf = cassandraClusterOf
)(logOf = logOf)
} yield adapter
val strategy = Strategy.fibonacci(100.millis).cap(config.startTimeout)
val onError: OnError[IO, Throwable] = { (error, status, decision) =>
{
decision match {
case OnError.Decision.Retry(delay) =>
log.warn(s"allocate failed, retrying in $delay, error: $error")

case OnError.Decision.GiveUp =>
val retries = status.retries
val duration = status.delay
log.error(s"allocate failed after $retries retries within $duration: $error", error)
}
}
}
adapter.allocated
.retry(strategy, onError)
.timeout(config.startTimeout)
.map { case (adapter, release0) =>
val release = release0
.timeout(config.startTimeout)
.handleErrorWith { error => log.error(s"release failed with $error", error) }
(adapter, release)
}
}
} yield adapter
}

def adapterOf[A](
toKey: ToKey[IO],
origin: Option[Origin],
snapshotSerializer: SnapshotSerializer[IO, A],
snapshotReadWrite: SnapshotReadWrite[IO, A],
config: SnapshotCassandraConfig,
cassandraClusterOf: CassandraClusterOf[IO]
)(implicit logOf: LogOf[IO]): Resource[IO, SnapshotStoreAdapter[IO]] =
SnapshotStoreAdapter.of[IO, A](
toKey = toKey,
origin = origin,
snapshotSerializer = snapshotSerializer,
snapshotReadWrite = snapshotReadWrite,
config = config,
cassandraClusterOf = cassandraClusterOf
)

def toKey: Resource[IO, ToKey[IO]] =
ToKey.fromConfig[IO](config).pure[Resource[IO, *]]

def origin: IO[Option[Origin]] = {
val hostName = Origin.hostName[IO]
def akkaHost = Origin.akkaHost[IO](system)
def akkaName = Origin.akkaName(system)
hostName.toOptionT
.orElse(akkaHost.toOptionT)
.orElse(akkaName.some.toOptionT[IO])
.value
}

def cassandraSnapshotStoreConfig: IO[CassandraSnapshotStoreConfig] =
ConfigSource
.fromConfig(config)
.load[CassandraSnapshotStoreConfig]
.liftTo[IO]

def serializer: Resource[IO, SnapshotSerializer[IO, Payload]] =
SnapshotSerializer.of[IO](system).toResource

def snapshotReadWrite(config: CassandraSnapshotStoreConfig): IO[SnapshotReadWrite[IO, Payload]] =
for {
jsonCodec <- jsonCodec(config)
} yield {
implicit val jsonCodec1 = jsonCodec
SnapshotReadWrite.of[IO, Payload]
}

def jsonCodec(config: CassandraSnapshotStoreConfig): IO[JsonCodec[IO]] = {
val codec: JsonCodec[IO] = config.jsonCodec match {
case KafkaJournalConfig.JsonCodec.Default => JsonCodec.default
case KafkaJournalConfig.JsonCodec.PlayJson => JsonCodec.playJson
case KafkaJournalConfig.JsonCodec.Jsoniter => JsonCodec.jsoniter
}
codec.pure[IO]
}

def cassandraClusterOf: Resource[IO, CassandraClusterOf[IO]] =
CassandraClusterOf.of[IO].toResource

def logOf: Resource[IO, LogOf[IO]] =
LogOfFromAkka[IO](system).pure[Resource[IO, *]]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package akka.persistence.kafka.journal

import com.evolutiongaming.kafka.journal.snapshot.cassandra.SnapshotCassandraConfig
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigCursor, ConfigReader, ConfigSource}

import scala.concurrent.duration._

/** Configuration for [[CassandraSnapshotStore]].
*
* This case class specifies configuration that could be set using `application.conf` (see `reference.conf` for an
* example of such configuration).
*
* @param cassandra
* Cassandra-specific configuration used by a plugin.
* @param startTimeout
* The timeout to create a journal adapter. Starting a journal involves some effectful steps, such as creating
* Cassandra session, so, in case of infrastructure or configuration troubles, it could take a longer time. Creating
* the journal will fail with [[TimeoutException]] if it takes longer than `startTimeout`.
* @param stopTimeout
* This is meant to be a counterpart to `startTimeout`, allowing resource release to timeout with an error. This
* parameter is not used, for now, and `startTimeout` is used instead.
* @param jsonCodec
* JSON codec to use for (de)serialization of the events from [[scodec.bits.ByteVector]] to
* [[play.api.libs.json.JsValue]] and vice-versa. This parameter is only relevant if default [[CassandraSnapshotStore]] is
* used, i.e. it is not taken into account if Circe JSON or other custom serialization is used.
*
* @see
* [[KafkaJournal]] for more details.
*/
final case class CassandraSnapshotStoreConfig(
cassandra: SnapshotCassandraConfig = SnapshotCassandraConfig.default,
startTimeout: FiniteDuration = 1.minute,
stopTimeout: FiniteDuration = 1.minute,
jsonCodec: KafkaJournalConfig.JsonCodec = KafkaJournalConfig.JsonCodec.Default
)

object CassandraSnapshotStoreConfig {

val default: CassandraSnapshotStoreConfig = CassandraSnapshotStoreConfig()

implicit val configReaderKafkaJournalConfig: ConfigReader[CassandraSnapshotStoreConfig] = {

val configReader = deriveReader[CassandraSnapshotStoreConfig]

cursor: ConfigCursor => {
for {
cursor <- cursor.asObjectCursor
config = cursor.objValue.toConfig
source = ConfigSource.fromConfig(config)
config <- source.load(configReader)
} yield config
}
}

}
Loading