Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra snapshot store #532

Draft
wants to merge 65 commits into
base: master
Choose a base branch
from
Draft

Cassandra snapshot store #532

wants to merge 65 commits into from

Conversation

rtar
Copy link
Contributor

@rtar rtar commented Nov 1, 2023

It is kind of a draft for now, but any feedback is welcome.

The Idea

The big idea is to get rid of sequence_nr column from a primary key. The presence of this column in Akka Persistence Cassandra causes creation of a tombstone each time the snapshot is deleted and the tombstone stays there until gc_grace_seconds is reached. This causes slower reads and may limit the ability to scan the table whenever the incident happen.

The new implementation has a fixed maximum number of rows per one pesistenceId and updates them in-place using a ring buffer approach.

Why part of Kafka Journal, not a separate library?

The goal is to reuse Kafka Journal infrastructure:

  1. Converting from persistenceId to (id, topic) pair and back,
  2. Reusing serialization approach,
  3. Reusing Cassandra specific utility classes from eventual-cassandra.

How to review?

In this specific PR there are only two useful / added-value classes:
SnapshotStatements - defines the new table used to store snapshots, and CQL statements to get/update/delete them,
SnapshotCassandra - defines the actual ring buffer logic to insert/update/delete snapshots from the new table.

You might also find SnapshotCassandraTest useful as it defines uses cases I was thinking about when writing SnapshotCassandra, and SnapshotPerfSpec which demonstrates that, indeed, a new plugin is faster when reading, and slower when writing snapshots.

Everything else are just wrappers around Akka SnapshotStore API without much added-value code.

Why it is a draft and not ready for merge?

I do not want to break the existing kafka-journal, obviously, so I tried to change as little as possible in a journal plugin, but I am not sure I fully succeeded. I.e. I had to squeeze in the code into CreateSchema and SchemaConfig classes, which creates a new snapshot table if it does not exist.

I am not sure it is a right thing to do, because most people won't use a new snapshotter for a long time.

Besides that, I was thinking that to sell something like this to the users, I should not increase complexity of kafka-journal, hence separating the code from kafka-journal into separate subproject / documenting everything well could be essential.

And the last, but not the least, I kind of hacked serialization part in, and it still needs to be checked / worked on.

The biggest obstacle is how to achieve these goals without making the changelist huge and unreviewable. I am playing with separating the code here (it is PR to PR), but I am not completely sure I am going to a right direction: https://github.com/evolution-gaming/kafka-journal/pull/537/files

@coveralls
Copy link

coveralls commented Nov 1, 2023

Pull Request Test Coverage Report for Build 9712199194

Details

  • 295 of 323 (91.33%) changed or added relevant lines in 12 files are covered.
  • 12 unchanged lines in 10 files lost coverage.
  • Overall coverage increased (+1.1%) to 82.287%

Changes Missing Coverage Covered Lines Changed/Added Lines %
snapshot/src/main/scala/com/evolutiongaming/kafka/journal/SnapshotStoreError.scala 0 1 0.0%
snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandra.scala 55 58 94.83%
persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotSerializer.scala 28 32 87.5%
snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotStatements.scala 79 84 94.05%
persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStore.scala 57 72 79.17%
Files with Coverage Reduction New Missed Lines %
persistence/src/main/scala/akka/persistence/kafka/journal/KafkaJournal.scala 1 84.76%
core/src/main/scala/com/evolutiongaming/kafka/journal/LogFromAkka.scala 1 28.57%
journal/src/main/scala/com/evolutiongaming/kafka/journal/HeadCacheConsumption.scala 1 96.43%
journal/src/main/scala/com/evolutiongaming/kafka/journal/Events.scala 1 90.0%
core/src/main/scala/com/evolutiongaming/kafka/journal/util/ActorSystemOf.scala 1 83.33%
core/src/main/scala/com/evolutiongaming/kafka/journal/util/GracefulFiber.scala 1 75.0%
core/src/main/scala/com/evolutiongaming/kafka/journal/SeqNr.scala 1 89.74%
journal/src/main/scala/com/evolutiongaming/kafka/journal/Journals.scala 1 84.43%
core/src/main/scala/com/evolutiongaming/kafka/journal/Key.scala 1 73.33%
journal/src/main/scala/com/evolutiongaming/kafka/journal/HeadCache.scala 3 86.67%
Totals Coverage Status
Change from base Build 9680733046: 1.1%
Covered Lines: 3670
Relevant Lines: 4460

💛 - Coveralls

@rtar rtar changed the title Snapshot statements Cassandra snapshot store Nov 21, 2023
def listOf(size: Int): List[BufferNr] =
(0 until size).toList.map(fromIntUnsafe)

def fromIntUnsafe(value: Int): BufferNr =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. some validation?
  2. non negative?
  3. where is safe version? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -34,10 +34,17 @@ object CassandraHelper {
}


implicit class RowOps(val self: Row) extends AnyVal {

def wasApplied: Boolean = self.getBool("[applied]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isApplied ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named it wasApplied, because it is called like that in Cassandra driver, so I thought it might ease discoverability a bit. But isApplied sounds fine too.

https://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/ResultSet.html#wasApplied--

@@ -12,6 +12,7 @@ final case class SchemaConfig(
metaJournalTable: String = "metajournal",
pointerTable: String = "pointer",
pointer2Table: String = "pointer2",
snapshotTable: String = "snapshot",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a note, akka currently uses snapshots table. We need to be 100% to not conflict with it :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not think about it, to be honest. Other name is required here, indeed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "snapshot_buffer"?


import java.time.Instant

final case class SnapshotSelectionCriteria(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It provides a bit of additional typesafety on top of the original structure.

I can move these parameters to the method arguments though.

Here is the only place it is used: https://github.com/evolution-gaming/kafka-journal/pull/532/files#diff-9ccd7e724e3622d863ea66a0e6c83a3d5bf96a6d0d089ad1235680ca6266556aR114

val (bufferNr, (deleteSnapshot, _)) = oldestSnapshot
val wasApplied = statements.updateRecord(key, segmentNr, bufferNr, insertSnapshot, deleteSnapshot)
wasApplied.flatMap { wasApplied =>
// TODO: consider adding circuit breaker here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please explain this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a bug or high contention (with the writer itself?), this call may lead to an infinite loop, which feels like a thing to consider protecting against.

val segmentNr = SegmentNr.min

def save(key: Key, snapshot: SnapshotRecord[EventualPayloadAndType]): F[Unit] = {
statements.selectMetadata(key, segmentNr).flatMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on the hot path given that buffer is full, do you need to query ALL snapshots?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need this information to find the oldest snapshot to be replaced. There are might be the ways to optimize it, though. 🤔

(bufferNr, (seqNr, timestamp))
}

rows.toList.map(_.toMap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, not sure toMap should be part of statement :)

savedSnapshots: Map[BufferNr, (SeqNr, Instant)],
insertSnapshot: SnapshotRecord[EventualPayloadAndType]
): F[Unit] = {
val sortedSnapshots = savedSnapshots.toList.sortBy { case (_, (seqNr, timestamp)) => (seqNr, timestamp) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to sort collection, just minBy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it correct to include timestamp here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, it should never happen, i.e. there should not be two snapshots with the same seqNr. 🤔

@rtar rtar force-pushed the snapshot branch 3 times, most recently from 7af15e7 to 6a6da8e Compare December 18, 2023 14:07
@rtar rtar force-pushed the snapshot branch 2 times, most recently from 611ff72 to 05e761a Compare January 17, 2024 12:05
@rtar rtar force-pushed the snapshot branch 2 times, most recently from 40f0178 to e46f1ed Compare January 25, 2024 12:46
@rtar rtar added this to the snapshotter milestone Jan 25, 2024
@rtar rtar force-pushed the snapshot branch 2 times, most recently from 64b3b10 to 70adbd5 Compare February 1, 2024 15:14
savedSnapshots: Map[BufferNr, (SeqNr, Instant)],
insertSnapshot: SnapshotRecord[EventualPayloadAndType]
): F[Unit] = {
val oldestSnapshot = savedSnapshots.toList.minByOption { case (_, (seqNr, timestamp)) => (seqNr, timestamp) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you expecting to have few snapshots with same seqNr? I do not understand use-case for it, could you please share more details

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should never have several snapshots with the same seqNr, but it seemed like a good idea to be a little bit defensive here.

I.e., if there is a bug or something, it will still filter by timestamp and delete an oldest one, not a random one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to log (or report any other way) the case of duplicated seqNr? Please resolve the conversation if its not easily possible

@rtar rtar force-pushed the snapshot branch 2 times, most recently from f14d9ac to 09c9161 Compare February 2, 2024 17:30
@rtar rtar force-pushed the snapshot branch 5 times, most recently from 81de6f3 to 3dcd233 Compare February 15, 2024 09:33
@rtar rtar force-pushed the snapshot branch 2 times, most recently from ae02bdc to 4337c0a Compare February 21, 2024 13:17
@rtar rtar force-pushed the snapshot branch 2 times, most recently from d9579f6 to 6c4f376 Compare April 19, 2024 09:39
rtar added 29 commits June 28, 2024 14:35
# Conflicts:
#	eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/CreateSchemaSpec.scala
# Conflicts:
#	eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/CreateSchema.scala
#	eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/CreateSchemaSpec.scala

# Conflicts:
#	build.sbt

# Conflicts:
#	build.sbt
It was inherited from original `Event` code, but it seems that it
was only needed there to ensure backwards compatibility when introducing
JSON payloads and no longer necessary.

See
c6d0543
for more details.
The reason behind this is that there won't be a flat counterpart of the
store. It will be the only one.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants