Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ class ReplicatedEventPublishingSpec
MyReplicatedBehavior.Command,
String,
Set[String]] =
_.withReplicatedEventTransformation { (_, eventWithMeta) =>
EventWithMetadata(eventWithMeta.event.toUpperCase, Nil)
_.withReplicatedEventsTransformation { (_, eventWithMeta) =>
EventWithMetadata(eventWithMeta.event.toUpperCase, Nil) :: Nil
}
val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB), modifyBehavior = addTransformation))
actor ! MyReplicatedBehavior.Add("one", probe.ref)
Expand All @@ -466,6 +466,40 @@ class ReplicatedEventPublishingSpec
probe.expectMessage(Set("one", "TWO", "three"))
}

"transform replicated events and emit additional events" in {
val id = nextEntityId()
val probe = createTestProbe[Any]()
case class Intercepted(origin: ReplicaId, seqNr: Long, event: String)
val addTransformation
: EventSourcedBehavior[MyReplicatedBehavior.Command, String, Set[String]] => EventSourcedBehavior[
MyReplicatedBehavior.Command,
String,
Set[String]] =
_.withReplicatedEventsTransformation { (_, eventWithMeta) =>
EventWithMetadata(eventWithMeta.event.toUpperCase + "-1", Nil) ::
EventWithMetadata(eventWithMeta.event.toUpperCase + "-2", Nil) ::
EventWithMetadata(eventWithMeta.event.toUpperCase + "-3", Nil) ::
Nil
}
val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB), modifyBehavior = addTransformation))
actor ! MyReplicatedBehavior.Add("one", probe.ref)
probe.expectMessage(Done)

// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
ReplicationId(EntityType, id, DCB).persistenceId,
1L,
"two",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty, None)),
None)
actor ! MyReplicatedBehavior.Add("three", probe.ref)
probe.expectMessage(Done)

actor ! MyReplicatedBehavior.Get(probe.ref)
probe.expectMessage(Set("one", "TWO-1", "TWO-2", "TWO-3", "three"))
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -774,14 +774,14 @@ class ReplicatedEventSourcingSpec
val addTransformation: (
EventSourcedBehavior[Command, String, State],
ActorContext[Command]) => EventSourcedBehavior[Command, String, State] = { (behv, context) =>
behv.withReplicatedEventTransformation { (_, eventWithMeta) =>
behv.withReplicatedEventsTransformation { (_, eventWithMeta) =>
val resMeta1 = EventSourcedBehavior.currentMetadata[ReplicatedEventMetadata](context)
val resMeta2 = eventWithMeta.metadata[ReplicatedEventMetadata]
if (resMeta1 != resMeta2)
throw new IllegalStateException(s"Expected RES metadata to be the same, $resMeta1 != $resMeta2")

val newMeta = eventWithMeta.metadata[Meta].map(m => m.copy(m.value.toUpperCase)).toList
EventWithMetadata(eventWithMeta.event.toUpperCase, newMeta)
EventWithMetadata(eventWithMeta.event.toUpperCase, newMeta) :: Nil
}
}
val r1 = spawn(
Expand Down Expand Up @@ -830,6 +830,140 @@ class ReplicatedEventSourcingSpec
r2 ! GetState(stateProbe.ref)
stateProbe.expectMessage(State(Vector("FROM R1", "from r2")))
}

"transform replicated events and emit additional events" in {
val entityId = nextEntityId
val probe = createTestProbe[Done]()
val eventProbe1 = createTestProbe[EventAndContext]()
val eventProbe2 = createTestProbe[EventAndContext]()
val addTransformation: (
EventSourcedBehavior[Command, String, State],
ActorContext[Command]) => EventSourcedBehavior[Command, String, State] = { (behv, context) =>
behv.withReplicatedEventsTransformation { (_, eventWithMeta) =>
val resMeta1 = EventSourcedBehavior.currentMetadata[ReplicatedEventMetadata](context)
val resMeta2 = eventWithMeta.metadata[ReplicatedEventMetadata]
if (resMeta1 != resMeta2)
throw new IllegalStateException(s"Expected RES metadata to be the same, $resMeta1 != $resMeta2")

if (eventWithMeta.event.startsWith("transformed")) {
// break the loop
eventWithMeta :: Nil
} else {
EventWithMetadata("transformed-1: " + eventWithMeta.event.toUpperCase, Meta("meta-1")) ::
EventWithMetadata("transformed-2: " + eventWithMeta.event.toUpperCase, Meta("meta-2")) ::
EventWithMetadata("transformed-3: " + eventWithMeta.event.toUpperCase, Meta("meta-3")) ::
Nil
}
}
}
val r1 = spawn(
testBehaviorWithContext(entityId, "R1", probe = Some(eventProbe1.ref), modifyBehavior = addTransformation))
val r2 = spawn(
testBehaviorWithContext(entityId, "R2", probe = Some(eventProbe2.ref), modifyBehavior = addTransformation))

r1 ! StoreMeWithMeta("from r1", probe.ref, Meta("meta from r1"))
eventProbe1.expectMessage(
EventAndContext(
"from r1",
ReplicaId("R1"),
recoveryRunning = false,
concurrent = false,
Some(Meta("meta from r1"))))
// replicated to r2, and transformed
eventProbe2.expectMessage(
EventAndContext(
"transformed-1: FROM R1",
ReplicaId("R1"),
recoveryRunning = false,
concurrent = false,
meta = Some(Meta("meta-1"))))
eventProbe2.expectMessage(
EventAndContext(
"transformed-2: FROM R1",
ReplicaId("R2"), // this is R2 because it was R2 that emitted it
recoveryRunning = false,
concurrent = false,
meta = Some(Meta("meta-2"))))
eventProbe2.expectMessage(
EventAndContext(
"transformed-3: FROM R1",
ReplicaId("R2"),
recoveryRunning = false,
concurrent = false,
meta = Some(Meta("meta-3"))))

// R2 emitted two additional events, and those are replicated back to R1
eventProbe1.expectMessage(
EventAndContext(
"transformed-2: FROM R1",
ReplicaId("R2"),
recoveryRunning = false,
concurrent = false,
meta = Some(Meta("meta-2"))))

eventProbe1.expectMessage(
EventAndContext(
"transformed-3: FROM R1",
ReplicaId("R2"),
recoveryRunning = false,
concurrent = false,
meta = Some(Meta("meta-3"))))

eventProbe1.expectNoMessage()
eventProbe2.expectNoMessage()

r2 ! StoreMeWithMeta("from r2", probe.ref, Meta("meta from r2"))
eventProbe2.expectMessage(
EventAndContext(
"from r2",
ReplicaId("R2"),
recoveryRunning = false,
concurrent = false,
Some(Meta("meta from r2"))))
// replicated to r1, and transformed
eventProbe1.expectMessage(
EventAndContext(
"transformed-1: FROM R2",
ReplicaId("R2"),
recoveryRunning = false,
concurrent = false,
meta = Some(Meta("meta-1"))))
eventProbe1.expectMessage(
EventAndContext(
"transformed-2: FROM R2",
ReplicaId("R1"),
recoveryRunning = false,
concurrent = false,
meta = Some(Meta("meta-2"))))
eventProbe1.expectMessage(
EventAndContext(
"transformed-3: FROM R2",
ReplicaId("R1"),
recoveryRunning = false,
concurrent = false,
meta = Some(Meta("meta-3"))))

// R1 emitted two additional events, and those are replicated back to R2
eventProbe2.expectMessage(
EventAndContext(
"transformed-2: FROM R2",
ReplicaId("R1"),
recoveryRunning = false,
concurrent = false,
meta = Some(Meta("meta-2"))))

eventProbe2.expectMessage(
EventAndContext(
"transformed-3: FROM R2",
ReplicaId("R1"),
recoveryRunning = false,
concurrent = false,
meta = Some(Meta("meta-3"))))

eventProbe1.expectNoMessage()
eventProbe2.expectNoMessage()

}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withReplicatedEventsTransformation")
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[akka] final class BehaviorSetup[C, E, S](
private var retentionInProgress: Boolean,
val instrumentation: EventSourcedBehaviorInstrumentation,
val replicationInterceptor: Option[ReplicationInterceptor[S, E]],
val replicatedEventTransformation: Option[(S, EventWithMetadata[E]) => EventWithMetadata[E]]) {
val replicatedEventTransformation: Option[(S, EventWithMetadata[E]) => Seq[EventWithMetadata[E]]]) {

import BehaviorSetup._
import InternalProtocol.RecoveryTickEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
publishEvents: Boolean = true,
customStashCapacity: Option[Int] = None,
replicatedEventInterceptor: Option[ReplicationInterceptor[State, Event]] = None,
replicatedEventTransformation: Option[(State, EventWithMetadata[Event]) => EventWithMetadata[Event]] = None)
replicatedEventTransformation: Option[(State, EventWithMetadata[Event]) => Seq[EventWithMetadata[Event]]] = None)
extends EventSourcedBehavior[Command, Event, State] {

import EventSourcedBehaviorImpl.WriterIdentity
Expand Down Expand Up @@ -345,6 +345,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](

override def withReplicatedEventTransformation(
f: (State, EventWithMetadata[Event]) => EventWithMetadata[Event]): EventSourcedBehavior[Command, Event, State] =
copy(replicatedEventTransformation = Some((s: State, e: EventWithMetadata[Event]) => f(s, e) :: Nil))

override def withReplicatedEventsTransformation(f: (State, EventWithMetadata[Event]) => Seq[EventWithMetadata[Event]])
: EventSourcedBehavior[Command, Event, State] =
copy(replicatedEventTransformation = Some(f))

}
Expand Down
Loading