diff --git a/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Recovering.scala b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Recovering.scala index 31bd7cd3..8bebbf03 100644 --- a/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Recovering.scala +++ b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Recovering.scala @@ -79,6 +79,7 @@ object Recovering { final private[Recovering] class Apply[S](private val b: Boolean = true) extends AnyVal { + @deprecated("Use apply with RecoveryContext", "4.1.7") def apply[F[_], E, A](replay: Resource[F, Replay[F, E]])( completed: (SeqNr, Journaller[F, E], Snapshotter[F, S]) => Resource[F, A], ): Recovering[F, S, E, A] = { @@ -96,6 +97,26 @@ object Recovering { completed1(seqNr, journaller, snapshotter) } } + + def apply1[F[_], E, A](replay: Resource[F, Replay[F, E]])( + completed: Recovering.RecoveryContext[F, S, E] => Resource[F, A], + ): Recovering[F, S, E, A] = { + val replay1 = replay + val completed1 = completed + new Recovering[F, S, E, A] { + + override def replay = replay1 + + override def completed( + seqNr: SeqNr, + journaller: Journaller[F, E], + snapshotter: Snapshotter[F, S], + ) = completed1(RecoveryContext(seqNr, journaller, snapshotter)) + + override def completed(ctx: RecoveryContext[F, S, E]) = completed1(ctx) + } + } + } def const[S]: Const[S] = new Const[S] @@ -135,6 +156,14 @@ object Recovering { val context1 = RecoveryContext(seqNr, journaller1, snapshotter1) self.completed(context1).flatMap(af) } + + override def completed(context: RecoveryContext[F, S1, E1]) = { + val journaller1 = context.journaller.convert(ef) + val snapshotter1 = context.snapshotter.convert(sf) + val context1 = RecoveryContext(context.seqNr, journaller1, snapshotter1) + self.completed(context1).flatMap(af) + } + } def map[A1](f: A => A1): Recovering[F, S, E, A1] = new Recovering[F, S, E, A1] { @@ -149,6 +178,9 @@ object Recovering { val context = RecoveryContext(seqNr, journaller, snapshotter) self.completed(context).map(f) } + + override def completed(context: RecoveryContext[F, S, E]) = + self.completed(context).map(f) } def mapM[A1]( @@ -165,6 +197,9 @@ object Recovering { val context = RecoveryContext(seqNr, journaller, snapshotter) self.completed(context).flatMap(f) } + + override def completed(context: RecoveryContext[F, S, E]) = + self.completed(context).flatMap(f) } } diff --git a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOfTest.scala b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOfTest.scala index a6a895be..f058ff24 100644 --- a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOfTest.scala +++ b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOfTest.scala @@ -98,50 +98,54 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher val recoveryStarted = RecoveryStarted .const { - Recovering[State] { - Replay.empty[F, Event].pure[Resource[F, *]] - } { (_, journaller, snapshotter) => - for { - stateRef <- Ref[F].of(0).toResource - } yield Receive[Envelope[Cmd]] { envelope => - val reply = Reply.fromActorRef[F](to = envelope.from, from = actorCtx.self) - - envelope.msg match { - case a: Cmd.WithCtx[_] => - for { - a <- a.f(actorCtx) - _ <- reply(a) - } yield false - - case Cmd.Inc => - for { - seqNr <- journaller.append(Events.of("a")).flatten - _ <- stateRef.update(_ + 1) - state <- stateRef.get - result <- snapshotter.save(seqNr, state) - seqNr <- journaller.append(Events.batched(Nel.of("b"), Nel.of("c", "d"))).flatten - _ <- result - _ <- stateRef.update(_ + 1) - _ <- reply(seqNr) - } yield false - - case Cmd.Stop => - for { - _ <- reply("stopping") - } yield true - } - } { + Recovering[State] + .apply1 { + Replay.empty[F, Event].pure[Resource[F, *]] + } { recoveringCtx => for { - _ <- actorCtx.setReceiveTimeout(Duration.Inf) - _ <- receiveTimeout - } yield false - } - .contramapM[Envelope[Any]] { envelope => - envelope.msg - .castM[F, Cmd] - .map(a => envelope.copy(msg = a)) + stateRef <- Ref[F].of(0).toResource + } yield Receive[Envelope[Cmd]] { envelope => + val reply = Reply.fromActorRef[F](to = envelope.from, from = actorCtx.self) + + envelope.msg match { + case a: Cmd.WithCtx[_] => + for { + a <- a.f(actorCtx) + _ <- reply(a) + } yield false + + case Cmd.Inc => + for { + seqNr <- recoveringCtx.journaller.append(Events.of("a")).flatten + _ <- stateRef.update(_ + 1) + state <- stateRef.get + result <- recoveringCtx.snapshotter.save(seqNr, state) + seqNr <- recoveringCtx.journaller + .append(Events.batched(Nel.of("b"), Nel.of("c", "d"))) + .flatten + _ <- result + _ <- stateRef.update(_ + 1) + _ <- reply(seqNr) + } yield false + + case Cmd.Stop => + for { + _ <- reply("stopping") + } yield true + } + } { + for { + _ <- actorCtx.setReceiveTimeout(Duration.Inf) + _ <- receiveTimeout + } yield false } - }.pure[Resource[F, *]] + .contramapM[Envelope[Any]] { envelope => + envelope.msg + .castM[F, Cmd] + .map(a => envelope.copy(msg = a)) + } + } + .pure[Resource[F, *]] } .pure[Resource[F, *]] EventSourced(EventSourcedId("id"), value = recoveryStarted).pure[F] @@ -300,16 +304,18 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] { - Replay.empty[F, E].pure[Resource[F, *]] - } { (_, journaller, snapshotter) => - val receive = for { - seqNr <- journaller.append(Events.of(0)).flatten - _ <- snapshotter.save(seqNr, 1).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource - }.pure[Resource[F, *]] + Recovering[S] + .apply1 { + Replay.empty[F, E].pure[Resource[F, *]] + } { recoveringCtx => + val receive = for { + seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten + _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource + } + .pure[Resource[F, *]] } Resource @@ -388,18 +394,20 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] { - Replay.empty[F, E].pure[Resource[F, *]] - } { (_, journaller, snapshotter) => - val receive = for { - seqNr <- journaller.append(Events.of(0)).flatten - _ <- snapshotter.save(seqNr, 1).flatten - seqNr <- journaller.append(Events.of(1)).flatten - _ <- journaller.deleteTo(seqNr).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource - }.pure[Resource[F, *]] + Recovering[S] + .apply1 { + Replay.empty[F, E].pure[Resource[F, *]] + } { recoveringCtx => + val receive = for { + seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten + _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten + seqNr <- recoveringCtx.journaller.append(Events.of(1)).flatten + _ <- recoveringCtx.journaller.deleteTo(seqNr).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource + } + .pure[Resource[F, *]] } Resource @@ -490,15 +498,17 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] { - Replay.empty[F, E].pure[Resource[F, *]] - } { (_, journaller, _) => - val receive = for { - _ <- journaller.append(Events.batched(Nel.of(0, 1), Nel.of(2))).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource - }.pure[Resource[F, *]] + Recovering[S] + .apply1 { + Replay.empty[F, E].pure[Resource[F, *]] + } { recoveringCtx => + val receive = for { + _ <- recoveringCtx.journaller.append(Events.batched(Nel.of(0, 1), Nel.of(2))).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource + } + .pure[Resource[F, *]] } Resource @@ -574,18 +584,20 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] { - Replay.empty[F, E].pure[Resource[F, *]] - } { (_, journaller, snapshotter) => - val receive = for { - seqNr <- journaller.append(Events.of(0)).flatten - _ <- snapshotter.save(seqNr, 1).flatten - _ <- journaller.append(Events.of(1)).flatten - _ <- snapshotter.delete(seqNr).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource - }.pure[Resource[F, *]] + Recovering[S] + .apply1 { + Replay.empty[F, E].pure[Resource[F, *]] + } { recoveringCtx => + val receive = for { + seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten + _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten + _ <- recoveringCtx.journaller.append(Events.of(1)).flatten + _ <- recoveringCtx.snapshotter.delete(seqNr).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource + } + .pure[Resource[F, *]] } Resource @@ -678,18 +690,20 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] { - Replay.empty[F, E].pure[Resource[F, *]] - } { (_, journaller, snapshotter) => - val receive = for { - seqNr <- journaller.append(Events.of(0)).flatten - _ <- snapshotter.save(seqNr, 1).flatten - _ <- journaller.append(Events.of(1)).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - - receive.toResource - }.pure[Resource[F, *]] + Recovering[S] + .apply1 { + Replay.empty[F, E].pure[Resource[F, *]] + } { recoveringCtx => + val receive = for { + seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten + _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten + _ <- recoveringCtx.journaller.append(Events.of(1)).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + + receive.toResource + } + .pure[Resource[F, *]] } Resource @@ -778,15 +792,17 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher .make(delay productR actorCtx.stop)(_ => stopped.complete(()).void) .as { RecoveryStarted.const { - Recovering[S]( - Replay - .empty[F, E] - .pure[Resource[F, *]], - ) { (_, _, _) => - Receive - .const[Envelope[C]](false.pure[F]) - .pure[Resource[F, *]] - }.pure[Resource[F, *]] + Recovering[S] + .apply1 { + Replay + .empty[F, E] + .pure[Resource[F, *]] + } { _ => + Receive + .const[Envelope[C]](false.pure[F]) + .pure[Resource[F, *]] + } + .pure[Resource[F, *]] } } EventSourced(EventSourcedId("10"), value = recoveryStarted).pure[F] @@ -979,9 +995,9 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher for { stateRef <- Ref[F].of(true).toResource - } yield Recovering[S] { + } yield Recovering[S].apply1 { Replay.const[E](stateRef.set(false)).pure[Resource[F, *]] - } { (_, journaller, _) => + } { recoveringCtx => def append: F[Unit] = for { state <- stateRef.get @@ -989,7 +1005,7 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher if (state) { events .traverse { event => - journaller.append(Events.of(event)) + recoveringCtx.journaller.append(Events.of(event)) } .flatMap(_.foldMapM(_.void)) } else { diff --git a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/InstrumentEventSourced.scala b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/InstrumentEventSourced.scala index 9ff3b401..6c3c5f49 100644 --- a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/InstrumentEventSourced.scala +++ b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/InstrumentEventSourced.scala @@ -43,7 +43,7 @@ object InstrumentEventSourced { for { recovering <- recoveryStarted(seqNr, snapshotOffer) _ <- resource(Action.RecoveryAllocated(seqNr, snapshotOffer1), Action.RecoveryReleased) - } yield Recovering[S] { + } yield Recovering[S].apply1 { for { _ <- resource(Action.ReplayAllocated, Action.ReplayReleased) replay <- recovering.replay @@ -53,7 +53,9 @@ object InstrumentEventSourced { _ <- record(Action.Replayed(event, seqNr)) } yield {} } - } { (seqNr, journaller, snapshotter) => + } { recoveringCtx => + import recoveringCtx.* + val journaller1 = new Instrument with Journaller[F, E] { def append = events => @@ -111,9 +113,11 @@ object InstrumentEventSourced { } for { - context <- Recovering.RecoveryContext(seqNr, journaller1, snapshotter1).pure[Resource[F, *]] + context <- Recovering + .RecoveryContext(recoveringCtx.seqNr, journaller1, snapshotter1, recoveredFromPersistence) + .pure[Resource[F, *]] receive <- recovering.completed(context) - _ <- resource(Action.ReceiveAllocated(seqNr), Action.ReceiveReleased) + _ <- resource(Action.ReceiveAllocated(recoveringCtx.seqNr), Action.ReceiveReleased) } yield Receive[Envelope[C]] { envelope => for { stop <- receive(envelope) diff --git a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistenceFailureTest.scala b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistenceFailureTest.scala index d0e16cf4..397d910b 100644 --- a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistenceFailureTest.scala +++ b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistenceFailureTest.scala @@ -32,16 +32,18 @@ class PersistenceFailureTest extends AnyFunSuite with Matchers { eventSourcedId = EventSourcedId("test"), pluginIds = PluginIds(journal, snapshot), value = RecoveryStarted[Unit] { (_, _) => - Recovering[Unit] { - Replay.empty[IO, Event].pure[Resource[IO, *]] - } { (_, journaller, _) => - Receive[Envelope[Event]] { envelope => - // persist event in forked thread thus don't fail actor directly - journaller.append(Events.of(envelope.msg)).flatten.start.as(false) - } { - IO(true) - }.pure[Resource[IO, *]] - }.pure[Resource[IO, *]] + Recovering[Unit] + .apply1 { + Replay.empty[IO, Event].pure[Resource[IO, *]] + } { recoveringCtx => + Receive[Envelope[Event]] { envelope => + // persist event in forked thread thus don't fail actor directly + recoveringCtx.journaller.append(Events.of(envelope.msg)).flatten.start.as(false) + } { + IO(true) + }.pure[Resource[IO, *]] + } + .pure[Resource[IO, *]] }.typeless( sf = _ => IO.unit, ef = e => IO(e.asInstanceOf[Event]), diff --git a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistentActorOfTest.scala b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistentActorOfTest.scala index 9cda61c7..d4b82341 100644 --- a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistentActorOfTest.scala +++ b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistentActorOfTest.scala @@ -94,50 +94,54 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers val recoveryStarted = RecoveryStarted .const { - Recovering[State] { - Replay.empty[F, Event].pure[Resource[F, *]] - } { (_, journaller, snapshotter) => - for { - stateRef <- Ref[F].of(0).toResource - } yield Receive[Envelope[Cmd]] { envelope => - val reply = Reply.fromActorRef[F](to = envelope.from, from = actorCtx.self) - - envelope.msg match { - case a: Cmd.WithCtx[_] => - for { - a <- a.f(actorCtx) - _ <- reply(a) - } yield false - - case Cmd.Inc => - for { - seqNr <- journaller.append(Events.of("a")).flatten - _ <- stateRef.update(_ + 1) - state <- stateRef.get - result <- snapshotter.save(seqNr, state) - seqNr <- journaller.append(Events.batched(Nel.of("b"), Nel.of("c", "d"))).flatten - _ <- result - _ <- stateRef.update(_ + 1) - _ <- reply(seqNr) - } yield false - - case Cmd.Stop => - for { - _ <- reply("stopping") - } yield true - } - } { + Recovering[State] + .apply1 { + Replay.empty[F, Event].pure[Resource[F, *]] + } { recoveringCtx => for { - _ <- actorCtx.setReceiveTimeout(Duration.Inf) - _ <- receiveTimeout - } yield false - } - .contramapM[Envelope[Any]] { envelope => - envelope.msg - .castM[F, Cmd] - .map(a => envelope.copy(msg = a)) + stateRef <- Ref[F].of(0).toResource + } yield Receive[Envelope[Cmd]] { envelope => + val reply = Reply.fromActorRef[F](to = envelope.from, from = actorCtx.self) + + envelope.msg match { + case a: Cmd.WithCtx[_] => + for { + a <- a.f(actorCtx) + _ <- reply(a) + } yield false + + case Cmd.Inc => + for { + seqNr <- recoveringCtx.journaller.append(Events.of("a")).flatten + _ <- stateRef.update(_ + 1) + state <- stateRef.get + result <- recoveringCtx.snapshotter.save(seqNr, state) + seqNr <- recoveringCtx.journaller + .append(Events.batched(Nel.of("b"), Nel.of("c", "d"))) + .flatten + _ <- result + _ <- stateRef.update(_ + 1) + _ <- reply(seqNr) + } yield false + + case Cmd.Stop => + for { + _ <- reply("stopping") + } yield true + } + } { + for { + _ <- actorCtx.setReceiveTimeout(Duration.Inf) + _ <- receiveTimeout + } yield false } - }.pure[Resource[F, *]] + .contramapM[Envelope[Any]] { envelope => + envelope.msg + .castM[F, Cmd] + .map(a => envelope.copy(msg = a)) + } + } + .pure[Resource[F, *]] } .pure[Resource[F, *]] EventSourced(EventSourcedId("id"), value = recoveryStarted).pure[F] @@ -292,16 +296,18 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] { - Replay.empty[F, E].pure[Resource[F, *]] - } { (_, journaller, snapshotter) => - val receive = for { - seqNr <- journaller.append(Events.of(0)).flatten - _ <- snapshotter.save(seqNr, 1).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource - }.pure[Resource[F, *]] + Recovering[S] + .apply1 { + Replay.empty[F, E].pure[Resource[F, *]] + } { recoveringCtx => + val receive = for { + seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten + _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource + } + .pure[Resource[F, *]] } Resource @@ -376,18 +382,20 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] { - Replay.empty[F, E].pure[Resource[F, *]] - } { (_, journaller, snapshotter) => - val receive = for { - seqNr <- journaller.append(Events.of(0)).flatten - _ <- snapshotter.save(seqNr, 1).flatten - seqNr <- journaller.append(Events.of(1)).flatten - _ <- journaller.deleteTo(seqNr).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource - }.pure[Resource[F, *]] + Recovering[S] + .apply1 { + Replay.empty[F, E].pure[Resource[F, *]] + } { recoveringCtx => + val receive = for { + seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten + _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten + seqNr <- recoveringCtx.journaller.append(Events.of(1)).flatten + _ <- recoveringCtx.journaller.deleteTo(seqNr).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource + } + .pure[Resource[F, *]] } Resource @@ -474,15 +482,17 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] { - Replay.empty[F, E].pure[Resource[F, *]] - } { (_, journaller, _) => - val receive = for { - _ <- journaller.append(Events.batched(Nel.of(0, 1), Nel.of(2))).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource - }.pure[Resource[F, *]] + Recovering[S] + .apply1 { + Replay.empty[F, E].pure[Resource[F, *]] + } { recoveringCtx => + val receive = for { + _ <- recoveringCtx.journaller.append(Events.batched(Nel.of(0, 1), Nel.of(2))).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource + } + .pure[Resource[F, *]] } Resource @@ -556,18 +566,20 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] { - Replay.empty[F, E].pure[Resource[F, *]] - } { (_, journaller, snapshotter) => - val receive = for { - seqNr <- journaller.append(Events.of(0)).flatten - _ <- snapshotter.save(seqNr, 1).flatten - _ <- journaller.append(Events.of(1)).flatten - _ <- snapshotter.delete(seqNr).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource - }.pure[Resource[F, *]] + Recovering[S] + .apply1 { + Replay.empty[F, E].pure[Resource[F, *]] + } { recoveringCtx => + val receive = for { + seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten + _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten + _ <- recoveringCtx.journaller.append(Events.of(1)).flatten + _ <- recoveringCtx.snapshotter.delete(seqNr).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource + } + .pure[Resource[F, *]] } Resource @@ -658,18 +670,20 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] { - Replay.empty[F, E].pure[Resource[F, *]] - } { (_, journaller, snapshotter) => - val receive = for { - seqNr <- journaller.append(Events.of(0)).flatten - _ <- snapshotter.save(seqNr, 1).flatten - _ <- journaller.append(Events.of(1)).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - - receive.toResource - }.pure[Resource[F, *]] + Recovering[S] + .apply1 { + Replay.empty[F, E].pure[Resource[F, *]] + } { recoveringCtx => + val receive = for { + seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten + _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten + _ <- recoveringCtx.journaller.append(Events.of(1)).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + + receive.toResource + } + .pure[Resource[F, *]] } Resource @@ -756,15 +770,17 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers .make(delay productR actorCtx.stop)(_ => stopped.complete(()).void) .as { RecoveryStarted.const { - Recovering[S]( - Replay - .empty[F, E] - .pure[Resource[F, *]], - ) { (_, _, _) => - Receive - .const[Envelope[C]](false.pure[F]) - .pure[Resource[F, *]] - }.pure[Resource[F, *]] + Recovering[S] + .apply1 { + Replay + .empty[F, E] + .pure[Resource[F, *]] + } { _ => + Receive + .const[Envelope[C]](false.pure[F]) + .pure[Resource[F, *]] + } + .pure[Resource[F, *]] } } EventSourced(EventSourcedId("10"), value = recoveryStarted).pure[F] @@ -951,9 +967,9 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers for { stateRef <- Ref[F].of(true).toResource - } yield Recovering[S] { + } yield Recovering[S].apply1 { Replay.const[E](stateRef.set(false)).pure[Resource[F, *]] - } { (_, journaller, _) => + } { recoveringCtx => def append: F[Unit] = for { state <- stateRef.get @@ -961,7 +977,7 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers if (state) { events .traverse { event => - journaller.append(Events.of(event)) + recoveringCtx.journaller.append(Events.of(event)) } .flatMap(_.foldMapM(_.void)) } else { diff --git a/version.sbt b/version.sbt index f9098247..e49e1309 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "4.1.7-SNAPSHOT" +ThisBuild / version := "4.1.7"