Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka-free event-sourced persistence #269

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
08e5725
implement akka-free (persistence) recovery API
Nov 7, 2023
23a1177
fix 2.12 compile err
Nov 8, 2023
3d8795e
drop snapshot criteria from new ESS API
Nov 14, 2023
a3140b3
drop snapshot criteria
Nov 14, 2023
b7ae06e
wip: implement event-sourced (persistent) actor
Nov 21, 2023
3d3d1ee
wip: track seqNr in Append
Nov 21, 2023
d3f7f5c
wip: add docs & fix compile err
Nov 22, 2023
f5079cd
wip: implement EventSourcedStore creation from Akka persistent/snapsh…
Nov 22, 2023
36f76fb
wip: add basic EventSourcedActorOf test
Nov 23, 2023
d2490c3
wip: non-func improvement
Nov 23, 2023
2bf7580
wip: add more tests
Nov 27, 2023
16e0b88
wip: cleanup non-related changes
Nov 27, 2023
421f89c
wip: fix compilation of 2.12
Nov 27, 2023
878a64f
wip
Nov 28, 2023
8dff02e
wip: implement PersistenceAdapter
Nov 30, 2023
e7c54ad
make LocalActorRef.! sync & add tests
Dec 1, 2023
00069e3
wip: start working on PersistenceAdapterTest
Dec 1, 2023
8d0fdf3
wip: add tests for journaller/snaps=shotter failures
Dec 5, 2023
14212bf
wip: add tests for snapshotter timeouts
Dec 6, 2023
b1edd41
wip: add timeout into LocalActorRef
Dec 6, 2023
c04d036
wip: use partial func in LocalActorRef
Dec 6, 2023
339d37d
wip: add seqNr tracking
Dec 7, 2023
db68486
connect akka interop and EventSourcedPersistence
Dec 7, 2023
7e80488
add all new classes
Dec 7, 2023
bc9b9be
fix fromSeqNr
Dec 7, 2023
68b2443
non-func renaming
Dec 7, 2023
e88a92b
add persistenceOf test
Dec 9, 2023
f4ca6fe
add persistenceOf test
Dec 9, 2023
3a477bd
fix 2.12 tests
Dec 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@ project/plugins/project/
# Mac
.DS_Store

ignored
metals.sbt
.metals
.bloop

ignored
42 changes: 42 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version = 3.7.3

runner.dialect = scala213source3

maxColumn = 140

style = defaultWithAlign

continuationIndent {
callSite = 2
defnSite = 2
}

rewrite.rules = [
RedundantBraces, RedundantParens, SortModifiers, prefercurlyfors
]

newlines {
penalizeSingleSelectMultiArgList = false
alwaysBeforeElseAfterCurlyIf = false
beforeCurlyLambdaParams = multilineWithCaseOnly
}

runner.optimizer.forceConfigStyleMinArgCount = 1

align {

openParenDefnSite = false
openParenCallSite = false

tokenCategory {
Equals = Assign
LeftArrow = Assign
}
}

lineEndings = unix

importSelectors = noBinPack

danglingParentheses.preset = true
binPack.literalArgumentLists = true
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[akkaeffect] object Act {
}
}

def receive(receive: Actor.Receive) = {
def receive(receive: Actor.Receive): Actor.Receive = {
val receiveMsg: Actor.Receive = { case Msg(f) => f() }
syncReceive(receiveMsg orElse receive)
}
Expand Down
26 changes: 23 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ lazy val root = (project in file(".")
`actor-tests`,
testkit,
persistence,
`persistence-api`,
eventsourcing,
cluster,
`cluster-sharding`))
Expand Down Expand Up @@ -64,13 +65,32 @@ lazy val testkit = (project in file("testkit")
Akka.testkit % Test,
scalatest % Test)))

lazy val `persistence-api` = (project in file("persistence-api")
settings (name := "akka-effect-persistence-api")
settings commonSettings
dependsOn(
actor % "test->test;compile->compile",
testkit % "test->test;test->compile",
`actor-tests` % "test->test")
settings (
libraryDependencies ++= Seq(
Cats.core,
CatsEffect.effect,
`cats-helper`,
sstream,
Akka.persistence, // temporal dependency
Akka.slf4j % Test,
Akka.testkit % Test,
scalatest % Test)))

lazy val persistence = (project in file("persistence")
settings (name := "akka-effect-persistence")
settings commonSettings
dependsOn(
actor % "test->test;compile->compile",
testkit % "test->test;test->compile",
`actor-tests` % "test->test")
`persistence-api` % "test->test;compile->compile",
actor % "test->test;compile->compile",
testkit % "test->test;test->compile",
`actor-tests` % "test->test")
settings (
libraryDependencies ++= Seq(
Akka.actor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ object JournalKeeper {

def delete(criteria: SnapshotSelectionCriteria) = {

delete(Snapshotter.Criteria(criteria))

}

def delete(criteria: Snapshotter.Criteria): F[F[Unit]] = {

def selected(meta: SnapshotMetadata) = {
meta.seqNr <= criteria.maxSequenceNr && meta.timestamp.toEpochMilli <= criteria.maxSequenceNr
}
Expand All @@ -346,6 +352,7 @@ object JournalKeeper {
.map { _.joinWithNever }
}
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,13 @@ object JournalKeeperTest {
.add(Action.DeleteSnapshots(criteria))
.map { _.pure[F] }
}

def delete(criteria: Snapshotter.Criteria): F[F[Unit]] = {
actions
.add(Action.DeleteSnapshots(criteria.asAkka))
.map { _.pure[F] }
}

}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.evolutiongaming.akkaeffect.persistence

import cats.implicits._
import cats.{Applicative, FlatMap, Monad, ~>}
import com.evolutiongaming.akkaeffect.Fail
import com.evolutiongaming.catshelper.{Log, MeasureDuration, MonadThrowable}

trait Append[F[_], -A] {

/**
* @param events to be saved, inner Nel[A] will be persisted atomically, outer Nel[_] is for batching
* @return SeqNr of last event
*/
def apply(events: Events[A]): F[F[SeqNr]]
}

object Append {

def const[F[_], A](seqNr: F[F[SeqNr]]): Append[F, A] = {
class Const
new Const with Append[F, A] {
def apply(events: Events[A]) = seqNr
}
}

def empty[F[_]: Applicative, A]: Append[F, A] =
const(SeqNr.Min.pure[F].pure[F])

implicit class AppendOps[F[_], A](val self: Append[F, A]) extends AnyVal {

def mapK[G[_]: Applicative](f: F ~> G): Append[G, A] = { events =>
f(self(events)).map { a =>
f(a)
}
}

def convert[B](f: B => F[A])(implicit F: Monad[F]): Append[F, B] = {
events =>
{
for {
events <- events.traverse(f)
seqNr <- self(events)
} yield seqNr
}
}

def narrow[B <: A]: Append[F, B] = events => self(events)

def withLogging1(log: Log[F])(
implicit
F: FlatMap[F],
measureDuration: MeasureDuration[F]
): Append[F, A] = events => {
for {
d <- MeasureDuration[F].start
r <- self(events)
} yield
for {
r <- r
d <- d
_ <- log.debug(s"append ${events.size} events in ${d.toMillis}ms")
} yield r
}

def withFail(fail: Fail[F])(implicit F: MonadThrowable[F]): Append[F, A] = {
events =>
fail.adapt(s"failed to append $events") { self(events) }
}
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package com.evolutiongaming.akkaeffect.persistence

import akka.persistence.DeleteEventsToInterop
import cats.effect.{Resource, Sync}
import cats.syntax.all._
import cats.{Applicative, FlatMap, ~>}
import com.evolutiongaming.akkaeffect.Fail
import com.evolutiongaming.catshelper.{FromFuture, Log, MeasureDuration, MonadThrowable}

import scala.concurrent.duration.FiniteDuration
import com.evolutiongaming.catshelper.{Log, MeasureDuration, MonadThrowable}

/**
* @see [[akka.persistence.Eventsourced.deleteMessages]]
Expand All @@ -31,15 +27,6 @@ object DeleteEventsTo {
}
}


private[akkaeffect] def of[F[_]: Sync: FromFuture, A](
persistentActor: akka.persistence.PersistentActor,
timeout: FiniteDuration
): Resource[F, DeleteEventsTo[F]] = {
DeleteEventsToInterop(persistentActor, timeout)
}


private sealed abstract class WithLogging

private sealed abstract class WithFail
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.evolutiongaming.akkaeffect.persistence

trait Event[E] {

def event: E
def seqNr: SeqNr

}

object Event {

private case class Const[E](event: E, seqNr: SeqNr) extends Event[E]

def const[E](event: E, seqNr: SeqNr): Event[E] = Const(event, seqNr)

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ final case class EventSourcedId(value: String) {

object EventSourcedId {

implicit val orderEventSourcedId: Order[EventSourcedId] = Order.by { a: EventSourcedId => a.value }
implicit val orderEventSourcedId: Order[EventSourcedId] = Order.by {
a: EventSourcedId =>
a.value
}

implicit val showEventSourcedId: Show[EventSourcedId] = Show.fromToString
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.evolutiongaming.akkaeffect.persistence

import com.evolutiongaming.sstream.Stream
import cats.Applicative
import cats.syntax.all._

/** Event sourcing persistence API: provides snapshot followed by stream of events. After recovery completed, provides instances of
* [[Journaller]] and [[Snapshotter]].
*
* @tparam F
* Effect type.
* @tparam S
* Snapshot type.
* @tparam E
* Event type.
*/
trait EventSourcedPersistence[F[_], S, E] {

import EventSourcedPersistence.Recovery

/** Start recovery by retrieving snapshot (eager, happening on [[F]]) and preparing for loading events (lazy op, happens on
* [[Recovery#events()]] stream materialisation).
* @return
* Instance of [[Recovery]] that represents started recovery.
*/
def recover: F[Recovery[F, S, E]]

/** Create [[Journaller]] capable of persisting and deleting events.
* @param seqNr
* Recovered [[SeqNr]] or [[SeqNr.Min]] if nothing was recovered.
*/
def journaller(seqNr: SeqNr): F[Journaller[F, E]]

/** Create [[Snapshotter]] capable of persisting and deleting snapshots.
*/
def snapshotter: F[Snapshotter[F, S]]
}

object EventSourcedPersistence {

/** Representation of __started__ recovery process: snapshot is already loaded in memory (if any) while events will be loaded only on
* materialisation of [[Stream]]
*
* @tparam F
* effect
* @tparam S
* snapshot
* @tparam E
* event
*/
trait Recovery[F[_], S, E] {

def snapshot: Option[Snapshot[S]]
def events: Stream[F, Event[E]]

}

object Recovery {

private case class Const[F[_], S, E](snapshot: Option[Snapshot[S]], events: Stream[F, Event[E]]) extends Recovery[F, S, E]

def const[F[_], S, E](snapshot: Option[Snapshot[S]], events: Stream[F, Event[E]]): Recovery[F, S, E] =
Const(snapshot, events)

}

def const[F[_]: Applicative, S, E](
recovery: Recovery[F, S, E],
journaller: Journaller[F, E],
snapshotter: Snapshotter[F, S]
): EventSourcedPersistence[F, S, E] = {

val (r, j, s) = (recovery, journaller, snapshotter)

new EventSourcedPersistence[F, S, E] {

override def recover: F[Recovery[F, S, E]] = r.pure[F]

override def journaller(seqNr: SeqNr): F[Journaller[F, E]] = j.pure[F]

override def snapshotter: F[Snapshotter[F, S]] = s.pure[F]
}
}

}
Loading
Loading