Skip to content

Commit

Permalink
add Recovering factory method that uses RecoveryContext
Browse files Browse the repository at this point in the history
  • Loading branch information
Denys Fakhritdinov committed Jun 24, 2024
1 parent f2fbd7e commit b722684
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ object Recovering {

final private[Recovering] class Apply[S](private val b: Boolean = true) extends AnyVal {

@deprecated("Use apply with RecoveryContext", "4.1.7")
def apply[F[_], E, A](replay: Resource[F, Replay[F, E]])(
completed: (SeqNr, Journaller[F, E], Snapshotter[F, S]) => Resource[F, A],
): Recovering[F, S, E, A] = {
Expand All @@ -96,6 +97,26 @@ object Recovering {
completed1(seqNr, journaller, snapshotter)
}
}

def apply1[F[_], E, A](replay: Resource[F, Replay[F, E]])(
completed: Recovering.RecoveryContext[F, S, E] => Resource[F, A],
): Recovering[F, S, E, A] = {
val replay1 = replay
val completed1 = completed
new Recovering[F, S, E, A] {

override def replay = replay1

override def completed(
seqNr: SeqNr,
journaller: Journaller[F, E],
snapshotter: Snapshotter[F, S],
) = completed1(RecoveryContext(seqNr, journaller, snapshotter))

override def completed(ctx: RecoveryContext[F, S, E]) = completed1(ctx)
}
}

}

def const[S]: Const[S] = new Const[S]
Expand Down Expand Up @@ -135,6 +156,14 @@ object Recovering {
val context1 = RecoveryContext(seqNr, journaller1, snapshotter1)
self.completed(context1).flatMap(af)
}

override def completed(context: RecoveryContext[F, S1, E1]) = {
val journaller1 = context.journaller.convert(ef)
val snapshotter1 = context.snapshotter.convert(sf)
val context1 = RecoveryContext(context.seqNr, journaller1, snapshotter1)
self.completed(context1).flatMap(af)
}

}

def map[A1](f: A => A1): Recovering[F, S, E, A1] = new Recovering[F, S, E, A1] {
Expand All @@ -149,6 +178,9 @@ object Recovering {
val context = RecoveryContext(seqNr, journaller, snapshotter)
self.completed(context).map(f)
}

override def completed(context: RecoveryContext[F, S, E]) =
self.completed(context).map(f)
}

def mapM[A1](
Expand All @@ -165,6 +197,9 @@ object Recovering {
val context = RecoveryContext(seqNr, journaller, snapshotter)
self.completed(context).flatMap(f)
}

override def completed(context: RecoveryContext[F, S, E]) =
self.completed(context).flatMap(f)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,50 +98,54 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher
val recoveryStarted =
RecoveryStarted
.const {
Recovering[State] {
Replay.empty[F, Event].pure[Resource[F, *]]
} { (_, journaller, snapshotter) =>
for {
stateRef <- Ref[F].of(0).toResource
} yield Receive[Envelope[Cmd]] { envelope =>
val reply = Reply.fromActorRef[F](to = envelope.from, from = actorCtx.self)

envelope.msg match {
case a: Cmd.WithCtx[_] =>
for {
a <- a.f(actorCtx)
_ <- reply(a)
} yield false

case Cmd.Inc =>
for {
seqNr <- journaller.append(Events.of("a")).flatten
_ <- stateRef.update(_ + 1)
state <- stateRef.get
result <- snapshotter.save(seqNr, state)
seqNr <- journaller.append(Events.batched(Nel.of("b"), Nel.of("c", "d"))).flatten
_ <- result
_ <- stateRef.update(_ + 1)
_ <- reply(seqNr)
} yield false

case Cmd.Stop =>
for {
_ <- reply("stopping")
} yield true
}
} {
Recovering[State]
.apply1 {
Replay.empty[F, Event].pure[Resource[F, *]]
} { recoveringCtx =>
for {
_ <- actorCtx.setReceiveTimeout(Duration.Inf)
_ <- receiveTimeout
} yield false
}
.contramapM[Envelope[Any]] { envelope =>
envelope.msg
.castM[F, Cmd]
.map(a => envelope.copy(msg = a))
stateRef <- Ref[F].of(0).toResource
} yield Receive[Envelope[Cmd]] { envelope =>
val reply = Reply.fromActorRef[F](to = envelope.from, from = actorCtx.self)

envelope.msg match {
case a: Cmd.WithCtx[_] =>
for {
a <- a.f(actorCtx)
_ <- reply(a)
} yield false

case Cmd.Inc =>
for {
seqNr <- recoveringCtx.journaller.append(Events.of("a")).flatten
_ <- stateRef.update(_ + 1)
state <- stateRef.get
result <- recoveringCtx.snapshotter.save(seqNr, state)
seqNr <- recoveringCtx.journaller
.append(Events.batched(Nel.of("b"), Nel.of("c", "d")))
.flatten
_ <- result
_ <- stateRef.update(_ + 1)
_ <- reply(seqNr)
} yield false

case Cmd.Stop =>
for {
_ <- reply("stopping")
} yield true
}
} {
for {
_ <- actorCtx.setReceiveTimeout(Duration.Inf)
_ <- receiveTimeout
} yield false
}
}.pure[Resource[F, *]]
.contramapM[Envelope[Any]] { envelope =>
envelope.msg
.castM[F, Cmd]
.map(a => envelope.copy(msg = a))
}
}
.pure[Resource[F, *]]
}
.pure[Resource[F, *]]
EventSourced(EventSourcedId("id"), value = recoveryStarted).pure[F]
Expand Down Expand Up @@ -300,16 +304,18 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher
EventSourcedOf.const {
val recoveryStarted = {
val started = RecoveryStarted.const {
Recovering[S] {
Replay.empty[F, E].pure[Resource[F, *]]
} { (_, journaller, snapshotter) =>
val receive = for {
seqNr <- journaller.append(Events.of(0)).flatten
_ <- snapshotter.save(seqNr, 1).flatten
_ <- startedDeferred.complete(())
} yield Receive.const[Envelope[C]](false.pure[F])
receive.toResource
}.pure[Resource[F, *]]
Recovering[S]
.apply1 {
Replay.empty[F, E].pure[Resource[F, *]]
} { recoveringCtx =>
val receive = for {
seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten
_ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten
_ <- startedDeferred.complete(())
} yield Receive.const[Envelope[C]](false.pure[F])
receive.toResource
}
.pure[Resource[F, *]]
}

Resource
Expand Down Expand Up @@ -388,18 +394,20 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher
EventSourcedOf.const {
val recoveryStarted = {
val started = RecoveryStarted.const {
Recovering[S] {
Replay.empty[F, E].pure[Resource[F, *]]
} { (_, journaller, snapshotter) =>
val receive = for {
seqNr <- journaller.append(Events.of(0)).flatten
_ <- snapshotter.save(seqNr, 1).flatten
seqNr <- journaller.append(Events.of(1)).flatten
_ <- journaller.deleteTo(seqNr).flatten
_ <- startedDeferred.complete(())
} yield Receive.const[Envelope[C]](false.pure[F])
receive.toResource
}.pure[Resource[F, *]]
Recovering[S]
.apply1 {
Replay.empty[F, E].pure[Resource[F, *]]
} { recoveringCtx =>
val receive = for {
seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten
_ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten
seqNr <- recoveringCtx.journaller.append(Events.of(1)).flatten
_ <- recoveringCtx.journaller.deleteTo(seqNr).flatten
_ <- startedDeferred.complete(())
} yield Receive.const[Envelope[C]](false.pure[F])
receive.toResource
}
.pure[Resource[F, *]]
}

Resource
Expand Down Expand Up @@ -490,15 +498,17 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher
EventSourcedOf.const {
val recoveryStarted = {
val started = RecoveryStarted.const {
Recovering[S] {
Replay.empty[F, E].pure[Resource[F, *]]
} { (_, journaller, _) =>
val receive = for {
_ <- journaller.append(Events.batched(Nel.of(0, 1), Nel.of(2))).flatten
_ <- startedDeferred.complete(())
} yield Receive.const[Envelope[C]](false.pure[F])
receive.toResource
}.pure[Resource[F, *]]
Recovering[S]
.apply1 {
Replay.empty[F, E].pure[Resource[F, *]]
} { recoveringCtx =>
val receive = for {
_ <- recoveringCtx.journaller.append(Events.batched(Nel.of(0, 1), Nel.of(2))).flatten
_ <- startedDeferred.complete(())
} yield Receive.const[Envelope[C]](false.pure[F])
receive.toResource
}
.pure[Resource[F, *]]
}

Resource
Expand Down Expand Up @@ -574,18 +584,20 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher
EventSourcedOf.const {
val recoveryStarted = {
val started = RecoveryStarted.const {
Recovering[S] {
Replay.empty[F, E].pure[Resource[F, *]]
} { (_, journaller, snapshotter) =>
val receive = for {
seqNr <- journaller.append(Events.of(0)).flatten
_ <- snapshotter.save(seqNr, 1).flatten
_ <- journaller.append(Events.of(1)).flatten
_ <- snapshotter.delete(seqNr).flatten
_ <- startedDeferred.complete(())
} yield Receive.const[Envelope[C]](false.pure[F])
receive.toResource
}.pure[Resource[F, *]]
Recovering[S]
.apply1 {
Replay.empty[F, E].pure[Resource[F, *]]
} { recoveringCtx =>
val receive = for {
seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten
_ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten
_ <- recoveringCtx.journaller.append(Events.of(1)).flatten
_ <- recoveringCtx.snapshotter.delete(seqNr).flatten
_ <- startedDeferred.complete(())
} yield Receive.const[Envelope[C]](false.pure[F])
receive.toResource
}
.pure[Resource[F, *]]
}

Resource
Expand Down Expand Up @@ -678,18 +690,20 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher
EventSourcedOf.const {
val recoveryStarted = {
val started = RecoveryStarted.const {
Recovering[S] {
Replay.empty[F, E].pure[Resource[F, *]]
} { (_, journaller, snapshotter) =>
val receive = for {
seqNr <- journaller.append(Events.of(0)).flatten
_ <- snapshotter.save(seqNr, 1).flatten
_ <- journaller.append(Events.of(1)).flatten
_ <- startedDeferred.complete(())
} yield Receive.const[Envelope[C]](false.pure[F])

receive.toResource
}.pure[Resource[F, *]]
Recovering[S]
.apply1 {
Replay.empty[F, E].pure[Resource[F, *]]
} { recoveringCtx =>
val receive = for {
seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten
_ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten
_ <- recoveringCtx.journaller.append(Events.of(1)).flatten
_ <- startedDeferred.complete(())
} yield Receive.const[Envelope[C]](false.pure[F])

receive.toResource
}
.pure[Resource[F, *]]
}

Resource
Expand Down Expand Up @@ -778,15 +792,17 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher
.make(delay productR actorCtx.stop)(_ => stopped.complete(()).void)
.as {
RecoveryStarted.const {
Recovering[S](
Replay
.empty[F, E]
.pure[Resource[F, *]],
) { (_, _, _) =>
Receive
.const[Envelope[C]](false.pure[F])
.pure[Resource[F, *]]
}.pure[Resource[F, *]]
Recovering[S]
.apply1 {
Replay
.empty[F, E]
.pure[Resource[F, *]]
} { _ =>
Receive
.const[Envelope[C]](false.pure[F])
.pure[Resource[F, *]]
}
.pure[Resource[F, *]]
}
}
EventSourced(EventSourcedId("10"), value = recoveryStarted).pure[F]
Expand Down Expand Up @@ -979,17 +995,17 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher

for {
stateRef <- Ref[F].of(true).toResource
} yield Recovering[S] {
} yield Recovering[S].apply1 {
Replay.const[E](stateRef.set(false)).pure[Resource[F, *]]
} { (_, journaller, _) =>
} { recoveringCtx =>
def append: F[Unit] =
for {
state <- stateRef.get
result <-
if (state) {
events
.traverse { event =>
journaller.append(Events.of(event))
recoveringCtx.journaller.append(Events.of(event))
}
.flatMap(_.foldMapM(_.void))
} else {
Expand Down
Loading

0 comments on commit b722684

Please sign in to comment.