Skip to content

Commit a1c5b85

Browse files
dfakhritdinovDenys Fakhritdinov
andauthored
SHRIKE-54: expose notion of persistent snapshot in backward-compatible way (#321)
Done as backward-compatible alternative to #318 The PR should be merged after #319 to avoid breaking changes & major version bump. --------- Co-authored-by: Denys Fakhritdinov <dfakhritdinov@evolution.com>
1 parent e0cac26 commit a1c5b85

File tree

5 files changed

+68
-23
lines changed

5 files changed

+68
-23
lines changed

persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOf.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ object EventSourcedActorOf {
116116

117117
_ <- log.debug(s"recovery completed with seqNr $seqNr")
118118
journaller <- eventStore.asJournaller(actorCtx, seqNr).toResource
119-
receive <- recovering.completed(seqNr, journaller, snapshotStore.asSnapshotter)
119+
context = Recovering.RecoveryContext(seqNr, journaller, snapshotStore.asSnapshotter)
120+
receive <- recovering.completed(context)
120121
} yield receive
121122

122123
receive.onError {
@@ -125,7 +126,7 @@ object EventSourcedActorOf {
125126
}
126127
}
127128

128-
implicit final private class SnapshotOps[S](val snapshot: SnapshotStore.Offer[S]) extends AnyVal {
129+
implicit final private[evolutiongaming] class SnapshotOps[S](val snapshot: SnapshotStore.Offer[S]) extends AnyVal {
129130

130131
def asOffer: SnapshotOffer[S] =
131132
SnapshotOffer(
@@ -135,7 +136,8 @@ object EventSourcedActorOf {
135136

136137
}
137138

138-
implicit final private class SnapshotStoreOps[F[_], A](val store: SnapshotStore[F, A]) extends AnyVal {
139+
implicit final private[evolutiongaming] class SnapshotStoreOps[F[_], A](val store: SnapshotStore[F, A])
140+
extends AnyVal {
139141

140142
def asSnapshotter: Snapshotter[F, A] = new Snapshotter[F, A] {
141143

@@ -154,7 +156,7 @@ object EventSourcedActorOf {
154156
}
155157
}
156158

157-
implicit final private class EventStoreOps[F[_], E](val store: EventStore[F, E]) extends AnyVal {
159+
implicit final private[evolutiongaming] class EventStoreOps[F[_], E](val store: EventStore[F, E]) extends AnyVal {
158160

159161
def asJournaller(actorCtx: ActorCtx[F], seqNr: SeqNr)(implicit F: Concurrent[F], log: Log[F]): F[Journaller[F, E]] =
160162
for {

persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Persistence.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ private[akkaeffect] object Persistence {
5858
) = {
5959
val receive = for {
6060
recovering <- recoveryStarted(seqNr, none)
61-
receive <- recovering.completed(seqNr, journaller, snapshotter)
61+
context = Recovering.RecoveryContext(seqNr, journaller, snapshotter)
62+
receive <- recovering.completed(context)
6263
} yield Persistence.receive[F, S, E, C](receive)
6364
receive.toReleasable
6465
}
@@ -113,8 +114,9 @@ private[akkaeffect] object Persistence {
113114
.foldMapM(_.release)
114115
.toResource
115116
.productR {
117+
val context = Recovering.RecoveryContext(seqNr, journaller, snapshotter)
116118
recovering
117-
.completed(seqNr, journaller, snapshotter)
119+
.completed(context)
118120
.map(receive => Persistence.receive[F, S, E, C](receive))
119121
}
120122
.toReleasable

persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Recovering.scala

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import cats.effect.Resource
55
import cats.implicits.catsSyntaxApplicativeId
66
import com.evolutiongaming.akkaeffect.{Envelope, Receive}
77

8+
import scala.annotation.nowarn
9+
810
/** Describes "Recovery" phase
911
*
1012
* @tparam S
@@ -25,15 +27,54 @@ trait Recovering[F[_], S, E, +A] {
2527
* @see
2628
* [[akka.persistence.RecoveryCompleted]]
2729
*/
30+
@deprecated("Use completed with RecoveryContext", "4.1.5")
2831
def completed(
2932
seqNr: SeqNr,
3033
journaller: Journaller[F, E],
3134
snapshotter: Snapshotter[F, S],
3235
): Resource[F, A]
36+
37+
/** Called when recovery completed, resource will be released upon actor termination
38+
*
39+
* @see
40+
* [[akka.persistence.RecoveryCompleted]]
41+
*/
42+
def completed(context: Recovering.RecoveryContext[F, S, E]): Resource[F, A] = {
43+
@nowarn("msg=deprecated")
44+
val a = completed(context.seqNr, context.journaller, context.snapshotter)
45+
a
46+
}
47+
3348
}
3449

3550
object Recovering {
3651

52+
/** Context containing information about recovery and provides access to journaller and snapshotter
53+
*/
54+
trait RecoveryContext[F[_], -S, -E] {
55+
def seqNr: SeqNr
56+
def journaller: Journaller[F, E]
57+
def snapshotter: Snapshotter[F, S]
58+
def recoveredFromPersistence: Boolean
59+
}
60+
object RecoveryContext {
61+
62+
private case class Impl[F[_], S, E](
63+
seqNr: SeqNr,
64+
journaller: Journaller[F, E],
65+
snapshotter: Snapshotter[F, S],
66+
recoveredFromPersistence: Boolean,
67+
) extends RecoveryContext[F, S, E]
68+
69+
def apply[F[_], S, E](
70+
seqNr: SeqNr,
71+
journaller: Journaller[F, E],
72+
snapshotter: Snapshotter[F, S],
73+
recoveredFromPersistence: Boolean = true,
74+
): RecoveryContext[F, S, E] = Impl(seqNr, journaller, snapshotter, recoveredFromPersistence)
75+
76+
}
77+
3778
def apply[S]: Apply[S] = new Apply[S]
3879

3980
final private[Recovering] class Apply[S](private val b: Boolean = true) extends AnyVal {
@@ -91,9 +132,8 @@ object Recovering {
91132
) = {
92133
val journaller1 = journaller.convert(ef)
93134
val snapshotter1 = snapshotter.convert(sf)
94-
self
95-
.completed(seqNr, journaller1, snapshotter1)
96-
.flatMap(af)
135+
val context1 = RecoveryContext(seqNr, journaller1, snapshotter1)
136+
self.completed(context1).flatMap(af)
97137
}
98138
}
99139

@@ -105,10 +145,10 @@ object Recovering {
105145
seqNr: SeqNr,
106146
journaller: Journaller[F, E],
107147
snapshotter: Snapshotter[F, S],
108-
) =
109-
self
110-
.completed(seqNr, journaller, snapshotter)
111-
.map(f)
148+
) = {
149+
val context = RecoveryContext(seqNr, journaller, snapshotter)
150+
self.completed(context).map(f)
151+
}
112152
}
113153

114154
def mapM[A1](
@@ -121,10 +161,10 @@ object Recovering {
121161
seqNr: SeqNr,
122162
journaller: Journaller[F, E],
123163
snapshotter: Snapshotter[F, S],
124-
) =
125-
self
126-
.completed(seqNr, journaller, snapshotter)
127-
.flatMap(f)
164+
) = {
165+
val context = RecoveryContext(seqNr, journaller, snapshotter)
166+
self.completed(context).flatMap(f)
167+
}
128168
}
129169
}
130170

@@ -143,10 +183,10 @@ object Recovering {
143183
seqNr: SeqNr,
144184
journaller: Journaller[F, E1],
145185
snapshotter: Snapshotter[F, S1],
146-
) =
147-
self
148-
.completed(seqNr, journaller, snapshotter)
149-
.map(_.convert(cf, _.pure[F]))
186+
) = {
187+
val context = RecoveryContext(seqNr, journaller, snapshotter)
188+
self.completed(context).map(_.convert(cf, _.pure[F]))
189+
}
150190
}
151191

152192
def typeless(ef: Any => F[E], cf: Any => F[C])(implicit

persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/InstrumentEventSourced.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ object InstrumentEventSourced {
111111
}
112112

113113
for {
114-
receive <- recovering.completed(seqNr, journaller1, snapshotter1)
114+
context <- Recovering.RecoveryContext(seqNr, journaller1, snapshotter1).pure[Resource[F, *]]
115+
receive <- recovering.completed(context)
115116
_ <- resource(Action.ReceiveAllocated(seqNr), Action.ReceiveReleased)
116117
} yield Receive[Envelope[C]] { envelope =>
117118
for {

version.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ThisBuild / version := "4.1.5-SNAPSHOT"
1+
ThisBuild / version := "4.1.5"

0 commit comments

Comments
 (0)