This project aims to build a bridge between akka and pure functional code based on cats-effect
Covered:
Represents ActorRef.tell
trait Tell[F[_], -A] {
def apply(a: A, sender: Option[ActorRef] = None): F[Unit]
}
Represents ActorRef.ask
pattern
trait Ask[F[_], -A, B] {
def apply(msg: A, timeout: FiniteDuration, sender: Option[ActorRef]): F[B]
}
Represents reply pattern: sender() ! reply
trait Reply[F[_], -A] {
def apply(msg: A): F[Unit]
}
This is what you need to implement instead of familiar new Actor { ... }
trait Receive[F[_], -A, B] {
def apply(msg: A): F[B]
def timeout: F[B]
}
Constructs Actor.scala
out of receive: ActorCtx[F] => Resource[F, Receive[F, Any]]
Wraps ActorContext
trait ActorCtx[F[_]] {
def self: ActorRef
def parent: ActorRef
def executor: ExecutionContextExecutor
def setReceiveTimeout(timeout: Duration): F[Unit]
def child(name: String): F[Option[ActorRef]]
def children: F[List[ActorRef]]
def actorRefFactory: ActorRefFactory
def watch[A](actorRef: ActorRef, msg: A): F[Unit]
def unwatch(actorRef: ActorRef): F[Unit]
def stop: F[Unit]
}
Constructs PersistentActor.scala
out of eventSourcedOf: ActorCtx[F] => F[EventSourced[F, S, E, C]]
Describes a lifecycle of entity with regard to event sourcing, phases are: Started, Recovering, Receiving and Termination
trait EventSourced[F[_], S, E, C] {
def eventSourcedId: EventSourcedId
def recovery: Recovery
def pluginIds: PluginIds
def start: Resource[F, RecoveryStarted[F, S, E, C]]
}
Describes start of recovery phase
trait RecoveryStarted[F[_], S, E, C] {
def apply(
seqNr: SeqNr,
snapshotOffer: Option[SnapshotOffer[S]]
): Resource[F, Recovering[F, S, E, C]]
}
Describes recovery phase
trait Recovering[F[_], S, E, C] {
def replay: Resource[F, Replay[F, E]]
def completed(
seqNr: SeqNr,
journaller: Journaller[F, E],
snapshotter: Snapshotter[F, S]
): Resource[F, Receive[F, C]]
}
Used during recovery to replay events
trait Replay[F[_], A] {
def apply(seqNr: SeqNr, event: A): F[Unit]
}
Describes communication with underlying journal
trait Journaller[F[_], -A] {
def append: Append[F, A]
def deleteTo: DeleteEventsTo[F]
}
Describes communication with underlying snapshot storage
/**
* Describes communication with underlying snapshot storage
*
* @tparam A - snapshot
*/
trait Snapshotter[F[_], -A] {
def save(seqNr: SeqNr, snapshot: A): F[F[Instant]]
def delete(seqNr: SeqNr): F[F[Unit]]
def delete(criteria: SnapshotSelectionCriteria): F[F[Unit]]
}
This is the main runtime/queue where all actions against your state are processed in desired eventsourcing sequence:
- validate and finalize events
- append events to journal
- publish changed state
- execute side effects
It is optimised for maximum throughput hence different steps of different actions might be executed in parallel as well as events might be stored in batches
trait Engine[F[_], S, E] {
def state: F[State[S]]
/**
* @return Outer F[_] is about `load` being enqueued, this immediately provides order guarantees
* Inner F[_] is about `load` being completed
*/
def apply[A](load: F[Validate[F, S, E, A]]): F[F[A]]
}
in build.sbt
addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")
libraryDependencies += "com.evolutiongaming" %% "akka-effect-actor" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-actor-tests" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-persistence" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-eventsourcing" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-cluster" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-cluster-sharding" % "0.2.1"