diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala index 46c352cca7e..a72ce758a3b 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala @@ -444,8 +444,8 @@ class ReplicatedEventPublishingSpec MyReplicatedBehavior.Command, String, Set[String]] = - _.withReplicatedEventTransformation { (_, eventWithMeta) => - EventWithMetadata(eventWithMeta.event.toUpperCase, Nil) + _.withReplicatedEventsTransformation { (_, eventWithMeta) => + EventWithMetadata(eventWithMeta.event.toUpperCase, Nil) :: Nil } val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB), modifyBehavior = addTransformation)) actor ! MyReplicatedBehavior.Add("one", probe.ref) @@ -466,6 +466,40 @@ class ReplicatedEventPublishingSpec probe.expectMessage(Set("one", "TWO", "three")) } + "transform replicated events and emit additional events" in { + val id = nextEntityId() + val probe = createTestProbe[Any]() + case class Intercepted(origin: ReplicaId, seqNr: Long, event: String) + val addTransformation + : EventSourcedBehavior[MyReplicatedBehavior.Command, String, Set[String]] => EventSourcedBehavior[ + MyReplicatedBehavior.Command, + String, + Set[String]] = + _.withReplicatedEventsTransformation { (_, eventWithMeta) => + EventWithMetadata(eventWithMeta.event.toUpperCase + "-1", Nil) :: + EventWithMetadata(eventWithMeta.event.toUpperCase + "-2", Nil) :: + EventWithMetadata(eventWithMeta.event.toUpperCase + "-3", Nil) :: + Nil + } + val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB), modifyBehavior = addTransformation)) + actor ! MyReplicatedBehavior.Add("one", probe.ref) + probe.expectMessage(Done) + + // simulate a published event from another replica + actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( + ReplicationId(EntityType, id, DCB).persistenceId, + 1L, + "two", + System.currentTimeMillis(), + Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty, None)), + None) + actor ! MyReplicatedBehavior.Add("three", probe.ref) + probe.expectMessage(Done) + + actor ! MyReplicatedBehavior.Get(probe.ref) + probe.expectMessage(Set("one", "TWO-1", "TWO-2", "TWO-3", "three")) + } + } } diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala index 4bd0b278f0c..12a4a7b52b5 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala @@ -774,14 +774,14 @@ class ReplicatedEventSourcingSpec val addTransformation: ( EventSourcedBehavior[Command, String, State], ActorContext[Command]) => EventSourcedBehavior[Command, String, State] = { (behv, context) => - behv.withReplicatedEventTransformation { (_, eventWithMeta) => + behv.withReplicatedEventsTransformation { (_, eventWithMeta) => val resMeta1 = EventSourcedBehavior.currentMetadata[ReplicatedEventMetadata](context) val resMeta2 = eventWithMeta.metadata[ReplicatedEventMetadata] if (resMeta1 != resMeta2) throw new IllegalStateException(s"Expected RES metadata to be the same, $resMeta1 != $resMeta2") val newMeta = eventWithMeta.metadata[Meta].map(m => m.copy(m.value.toUpperCase)).toList - EventWithMetadata(eventWithMeta.event.toUpperCase, newMeta) + EventWithMetadata(eventWithMeta.event.toUpperCase, newMeta) :: Nil } } val r1 = spawn( @@ -830,6 +830,140 @@ class ReplicatedEventSourcingSpec r2 ! GetState(stateProbe.ref) stateProbe.expectMessage(State(Vector("FROM R1", "from r2"))) } + + "transform replicated events and emit additional events" in { + val entityId = nextEntityId + val probe = createTestProbe[Done]() + val eventProbe1 = createTestProbe[EventAndContext]() + val eventProbe2 = createTestProbe[EventAndContext]() + val addTransformation: ( + EventSourcedBehavior[Command, String, State], + ActorContext[Command]) => EventSourcedBehavior[Command, String, State] = { (behv, context) => + behv.withReplicatedEventsTransformation { (_, eventWithMeta) => + val resMeta1 = EventSourcedBehavior.currentMetadata[ReplicatedEventMetadata](context) + val resMeta2 = eventWithMeta.metadata[ReplicatedEventMetadata] + if (resMeta1 != resMeta2) + throw new IllegalStateException(s"Expected RES metadata to be the same, $resMeta1 != $resMeta2") + + if (eventWithMeta.event.startsWith("transformed")) { + // break the loop + eventWithMeta :: Nil + } else { + EventWithMetadata("transformed-1: " + eventWithMeta.event.toUpperCase, Meta("meta-1")) :: + EventWithMetadata("transformed-2: " + eventWithMeta.event.toUpperCase, Meta("meta-2")) :: + EventWithMetadata("transformed-3: " + eventWithMeta.event.toUpperCase, Meta("meta-3")) :: + Nil + } + } + } + val r1 = spawn( + testBehaviorWithContext(entityId, "R1", probe = Some(eventProbe1.ref), modifyBehavior = addTransformation)) + val r2 = spawn( + testBehaviorWithContext(entityId, "R2", probe = Some(eventProbe2.ref), modifyBehavior = addTransformation)) + + r1 ! StoreMeWithMeta("from r1", probe.ref, Meta("meta from r1")) + eventProbe1.expectMessage( + EventAndContext( + "from r1", + ReplicaId("R1"), + recoveryRunning = false, + concurrent = false, + Some(Meta("meta from r1")))) + // replicated to r2, and transformed + eventProbe2.expectMessage( + EventAndContext( + "transformed-1: FROM R1", + ReplicaId("R1"), + recoveryRunning = false, + concurrent = false, + meta = Some(Meta("meta-1")))) + eventProbe2.expectMessage( + EventAndContext( + "transformed-2: FROM R1", + ReplicaId("R2"), // this is R2 because it was R2 that emitted it + recoveryRunning = false, + concurrent = false, + meta = Some(Meta("meta-2")))) + eventProbe2.expectMessage( + EventAndContext( + "transformed-3: FROM R1", + ReplicaId("R2"), + recoveryRunning = false, + concurrent = false, + meta = Some(Meta("meta-3")))) + + // R2 emitted two additional events, and those are replicated back to R1 + eventProbe1.expectMessage( + EventAndContext( + "transformed-2: FROM R1", + ReplicaId("R2"), + recoveryRunning = false, + concurrent = false, + meta = Some(Meta("meta-2")))) + + eventProbe1.expectMessage( + EventAndContext( + "transformed-3: FROM R1", + ReplicaId("R2"), + recoveryRunning = false, + concurrent = false, + meta = Some(Meta("meta-3")))) + + eventProbe1.expectNoMessage() + eventProbe2.expectNoMessage() + + r2 ! StoreMeWithMeta("from r2", probe.ref, Meta("meta from r2")) + eventProbe2.expectMessage( + EventAndContext( + "from r2", + ReplicaId("R2"), + recoveryRunning = false, + concurrent = false, + Some(Meta("meta from r2")))) + // replicated to r1, and transformed + eventProbe1.expectMessage( + EventAndContext( + "transformed-1: FROM R2", + ReplicaId("R2"), + recoveryRunning = false, + concurrent = false, + meta = Some(Meta("meta-1")))) + eventProbe1.expectMessage( + EventAndContext( + "transformed-2: FROM R2", + ReplicaId("R1"), + recoveryRunning = false, + concurrent = false, + meta = Some(Meta("meta-2")))) + eventProbe1.expectMessage( + EventAndContext( + "transformed-3: FROM R2", + ReplicaId("R1"), + recoveryRunning = false, + concurrent = false, + meta = Some(Meta("meta-3")))) + + // R1 emitted two additional events, and those are replicated back to R2 + eventProbe2.expectMessage( + EventAndContext( + "transformed-2: FROM R2", + ReplicaId("R1"), + recoveryRunning = false, + concurrent = false, + meta = Some(Meta("meta-2")))) + + eventProbe2.expectMessage( + EventAndContext( + "transformed-3: FROM R2", + ReplicaId("R1"), + recoveryRunning = false, + concurrent = false, + meta = Some(Meta("meta-3")))) + + eventProbe1.expectNoMessage() + eventProbe2.expectNoMessage() + + } } } diff --git a/akka-persistence-typed/src/main/mima-filters/2.10.11.backwards.excludes/32823-res-transforms.excludes b/akka-persistence-typed/src/main/mima-filters/2.10.11.backwards.excludes/32823-res-transforms.excludes new file mode 100644 index 00000000000..76d38aa30c6 --- /dev/null +++ b/akka-persistence-typed/src/main/mima-filters/2.10.11.backwards.excludes/32823-res-transforms.excludes @@ -0,0 +1 @@ +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withReplicatedEventsTransformation") diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index 802f40e5b26..5530c0d04e7 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -66,7 +66,7 @@ private[akka] final class BehaviorSetup[C, E, S]( private var retentionInProgress: Boolean, val instrumentation: EventSourcedBehaviorInstrumentation, val replicationInterceptor: Option[ReplicationInterceptor[S, E]], - val replicatedEventTransformation: Option[(S, EventWithMetadata[E]) => EventWithMetadata[E]]) { + val replicatedEventTransformation: Option[(S, EventWithMetadata[E]) => Seq[EventWithMetadata[E]]]) { import BehaviorSetup._ import InternalProtocol.RecoveryTickEvent diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index c7cb317ea3a..eff805da2fe 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -121,7 +121,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( publishEvents: Boolean = true, customStashCapacity: Option[Int] = None, replicatedEventInterceptor: Option[ReplicationInterceptor[State, Event]] = None, - replicatedEventTransformation: Option[(State, EventWithMetadata[Event]) => EventWithMetadata[Event]] = None) + replicatedEventTransformation: Option[(State, EventWithMetadata[Event]) => Seq[EventWithMetadata[Event]]] = None) extends EventSourcedBehavior[Command, Event, State] { import EventSourcedBehaviorImpl.WriterIdentity @@ -345,6 +345,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( override def withReplicatedEventTransformation( f: (State, EventWithMetadata[Event]) => EventWithMetadata[Event]): EventSourcedBehavior[Command, Event, State] = + copy(replicatedEventTransformation = Some((s: State, e: EventWithMetadata[Event]) => f(s, e) :: Nil)) + + override def withReplicatedEventsTransformation(f: (State, EventWithMetadata[Event]) => Seq[EventWithMetadata[Event]]) + : EventSourcedBehavior[Command, Event, State] = copy(replicatedEventTransformation = Some(f)) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 2beaadae72a..0d03b7ef1ca 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -497,22 +497,24 @@ private[akka] object Running { setup.currentSequenceNumber = state.seqNr + 1 setup.currentMetadata = event.metadata val isConcurrent: Boolean = event.originVersion <> state.version - val updatedVersion = event.originVersion.merge(state.version) + var updatedVersion = event.originVersion.merge(state.version) if (setup.internalLogger.isDebugEnabled()) setup.internalLogger.debug( - "Processing event [{}] with version [{}]. Local version: {}. Updated version {}. Concurrent? {}", + "Processing replicated event [{}], origin [{}], origin seqNr [{}], concurrent [{}], with version [{}]. Local version: {}. Updated version {}.", Logging.simpleName(event.event.getClass), + event.originReplica, + event.originSequenceNr, + isConcurrent, event.originVersion, state.version, - updatedVersion, - isConcurrent) + updatedVersion) replication.setContext(recoveryRunning = false, event.originReplica, concurrent = isConcurrent) - val (transformedEvent, additionalMetadata) = setup.replicatedEventTransformation match { + val (transformedEvent, additionalMetadata, additionalEvents) = setup.replicatedEventTransformation match { case None => - (event.event, Nil) + (event.event, Nil, Nil) case Some(f) => val metadataEntries = event.metadata match { case None => Nil @@ -520,11 +522,10 @@ private[akka] object Running { case Some(meta) => meta :: Nil } val eventWithMetadata = EventWithMetadata(event.event, metadataEntries) - val newEventWithMetadata = f(state.state, eventWithMetadata) - if (newEventWithMetadata eq eventWithMetadata) - (event.event, Nil) // no change - else - (newEventWithMetadata.event, newEventWithMetadata.metadataEntries) + val newEventsWithMetadata = f(state.state, eventWithMetadata) + if (newEventsWithMetadata.isEmpty) + throw new IllegalStateException("At least one event is required in replicatedEventTransformation") + (newEventsWithMetadata.head.event, newEventsWithMetadata.head.metadataEntries, newEventsWithMetadata.tail) } val replicatedEventMetadata = @@ -542,10 +543,10 @@ private[akka] object Running { setup.currentMetadata = newMetadata // make new metadata visible to event handler val stateAfterApply = state.applyEvent(setup, transformedEvent) - val eventToPersist = adaptEvent(stateAfterApply.state, transformedEvent) + val adaptedEvent = adaptEvent(stateAfterApply.state, transformedEvent) val eventAdapterManifest = setup.eventAdapter.manifest(transformedEvent) - replication.clearContext() + val updatedSeen = stateAfterApply.seenPerReplica.updated(event.originReplica, event.originSequenceNr) val sideEffects = ackToOnPersisted match { case None => Nil @@ -555,20 +556,84 @@ private[akka] object Running { } :: Nil } - val newState2: RunningState[S] = - internalPersist(OptionVal.none, stateAfterApply, eventToPersist, eventAdapterManifest, newMetadata) + if (additionalEvents.isEmpty) { + replication.clearContext() + + val newState2: RunningState[S] = + internalPersist(OptionVal.none, stateAfterApply, adaptedEvent, eventAdapterManifest, newMetadata) + + val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, transformedEvent, newState2.seqNr) + + // one event, this is the normal case + persistingEvents( + newState2.copy(seenPerReplica = updatedSeen, version = updatedVersion), + state, + command = OptionVal.none[C], + numberOfEvents = 1, + shouldSnapshotAfterPersist, + shouldPublish = false, + sideEffects) + } else { + // additional events, this is similar to the persistAll case + + replication.setContext(recoveryRunning = false, replication.replicaId, concurrent = false) // local events are never concurrent + val replicatedEventMetadataTemplate = ReplicatedEventMetadata( + replication.replicaId, + 0L, // we replace it with actual seqnr later + updatedVersion, + concurrent = false) + + var currentState = stateAfterApply + var eventsToPersist + : List[EventToPersist] = EventToPersist(adaptedEvent, eventAdapterManifest, newMetadata) :: Nil + var shouldSnapshotAfterPersist = + setup.shouldSnapshot(stateAfterApply.state, transformedEvent, stateAfterApply.seqNr) + + additionalEvents.foreach { evtWithMeta => + val event = evtWithMeta.event + setup.currentSequenceNumber += 1 + setup.currentMetadata = CompositeMetadata.construct(evtWithMeta.metadataEntries) + val evtManifest = setup.eventAdapter.manifest(event) + val metadataEntries = { + updatedVersion = + updatedVersion.updated(replicatedEventMetadataTemplate.originReplica.id, setup.currentSequenceNumber) + if (setup.internalLogger.isTraceEnabled) + setup.internalLogger.trace( + "Additional event [{}] from replicated event , version vector [{}]", + Logging.simpleName(event.getClass), + updatedVersion) + currentState = currentState.copy(version = updatedVersion) + replicatedEventMetadataTemplate.copy( + originSequenceNr = setup.currentSequenceNumber, + version = updatedVersion) +: evtWithMeta.metadataEntries + } + val metadata = CompositeMetadata.construct(metadataEntries) + + currentState = currentState.applyEvent(setup, event) + if (shouldSnapshotAfterPersist == NoSnapshot) + shouldSnapshotAfterPersist = setup.shouldSnapshot(currentState.state, event, setup.currentSequenceNumber) + + val adaptedEvent = adaptEvent(currentState.state, event) + + eventsToPersist = EventToPersist(adaptedEvent, evtManifest, metadata) :: eventsToPersist + } - val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, transformedEvent, newState2.seqNr) - val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr) + replication.clearContext() + + val newState2 = + internalPersistAll(OptionVal.none, currentState, eventsToPersist.reverse) + + persistingEvents( + newState2.copy(seenPerReplica = updatedSeen, version = updatedVersion), + state, + command = OptionVal.none[C], + eventsToPersist.size, + shouldSnapshotAfterPersist, + shouldPublish = true, + sideEffects = sideEffects) + + } - persistingEvents( - newState2.copy(seenPerReplica = updatedSeen, version = updatedVersion), - state, - command = OptionVal.none[C], - numberOfEvents = 1, - shouldSnapshotAfterPersist, - shouldPublish = false, - sideEffects) } private def handleEventPersist( @@ -662,7 +727,7 @@ private[akka] object Running { case Some(template) => val updatedVersion = currentState.version.updated(template.originReplica.id, setup.currentSequenceNumber) - if (setup.internalLogger.isDebugEnabled) + if (setup.internalLogger.isTraceEnabled) setup.internalLogger.trace( "Processing event [{}] with version vector [{}]", Logging.simpleName(event.getClass), diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 713c6f36313..63c500585cd 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -343,8 +343,21 @@ object EventSourcedBehavior { */ @ApiMayChange @InternalStableApi + @deprecated("Use withReplicatedEventsTransformation", "2.10.12") def withReplicatedEventTransformation( f: (State, EventWithMetadata[Event]) => EventWithMetadata[Event]): EventSourcedBehavior[Command, Event, State] + + /** + * INTERNAL API: Invoke this transformation function when an event from another replica arrives, before persisting the event and + * before calling the ordinary event handler. The transformation function returns the updated event, and possibly + * additional events, and optionally additional metadata that will be stored together with the events. + * + * Only used when the entity is replicated. + */ + @ApiMayChange + @InternalStableApi + def withReplicatedEventsTransformation(f: (State, EventWithMetadata[Event]) => Seq[EventWithMetadata[Event]]) + : EventSourcedBehavior[Command, Event, State] } @FunctionalInterface