Skip to content

Commit

Permalink
Domain event subscriptions now accepts metadata as the first paramete…
Browse files Browse the repository at this point in the history
…r, besides just the event. The metadata currently contains the stream version and stream id, which can be useful when building projections.
  • Loading branch information
johanhaleby committed Jan 18, 2024
1 parent 0d0965f commit 7db40d6
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 41 deletions.
3 changes: 2 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
### Changelog next version
* spring-boot-starter-mongodb no longer autoconfigures itself by just importing the library in the classpath, instead you need to bootstrap by annotating your Spring Boot class with @EnableOccurrent.
* Fixed bug in spring-boot-starter-mongodb module in which it didn't automatically configured MongoDB.
* Fixed bug in spring-boot-starter-mongodb module in which it didn't automatically configured MongoDB.
* Domain event subscriptions now accepts metadata as the first parameter, besides just the event. The metadata currently contains the stream version and stream id, which can be useful when building projections.
* Upgraded from Kotlin 1.9.20 to 1.9.22
* Upgraded amqp-client from 5.16.0 to 5.20.0
* Upgraded Spring Boot from 3.1.4 to 3.2.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,28 @@ package org.occurrent.dsl.subscription.blocking
import io.cloudevents.CloudEvent
import org.occurrent.application.converter.CloudEventConverter
import org.occurrent.application.converter.get
import org.occurrent.cloudevents.OccurrentCloudEventExtension
import org.occurrent.condition.Condition
import org.occurrent.filter.Filter
import org.occurrent.subscription.OccurrentSubscriptionFilter
import org.occurrent.subscription.StartAt
import org.occurrent.subscription.api.blocking.Subscribable
import org.occurrent.subscription.api.blocking.Subscription
import java.util.function.BiConsumer
import java.util.function.Consumer
import kotlin.reflect.KClass

/**
* Metadata associated with the event, such as stream id and version and other CloudEvent extensions
* associated with the event.
*/
data class EventMetadata internal constructor(val data: Map<String, Any?>) {
val streamId: String get() = data[OccurrentCloudEventExtension.STREAM_ID] as String
val streamVersion: Long get() = data[OccurrentCloudEventExtension.STREAM_VERSION] as Long

inline operator fun <reified T : Any?> get(key: String) = data[key] as T
}

/**
* Subscription DSL entry-point. Usage example:
*
Expand All @@ -47,34 +60,60 @@ fun <E : Any> subscriptions(subscriptionModel: Subscribable, cloudEventConverter
Subscriptions(subscriptionModel, cloudEventConverter).apply(subscriptions)
}


class Subscriptions<E : Any>(private val subscriptionModel: Subscribable, private val cloudEventConverter: CloudEventConverter<E>) {

/**
* Create a new subscription that is invoked after a specific domain event is written to the event store
*/
@JvmName("subscribeAll")
fun subscribe(subscriptionId: String, startAt: StartAt? = null, fn: (E) -> Unit): Subscription {
return subscribe(subscriptionId, *emptyArray(), startAt = startAt) { e -> fn(e) }
return subscribe(subscriptionId, startAt) { _, e -> fn(e) }
}

/**
* Create a new subscription that is invoked after a specific domain event is written to the event store
*/
@JvmName("subscribeAll")
fun subscribe(subscriptionId: String, startAt: StartAt? = null, fn: (EventMetadata, E) -> Unit): Subscription {
return subscribe(subscriptionId, *emptyArray(), startAt = startAt) { metadata, e -> fn(metadata, e) }
}

/**
* Create a new subscription that is invoked after a specific domain event is written to the event store
*/
inline fun <reified E1 : E> subscribe(subscriptionId: String = E1::class.simpleName!!, startAt: StartAt? = null, crossinline fn: (E1) -> Unit): Subscription {
return subscribe(subscriptionId, E1::class, startAt = startAt) { e -> fn(e as E1) }
return subscribe(subscriptionId, E1::class, startAt = startAt) { _, e -> fn(e as E1) }
}

/**
* Create a new subscription that is invoked after a specific domain event is written to the event store
*/
inline fun <reified E1 : E> subscribe(subscriptionId: String = E1::class.simpleName!!, startAt: StartAt? = null, crossinline fn: (EventMetadata, E1) -> Unit): Subscription {
return subscribe(subscriptionId, E1::class, startAt = startAt) { metadata, e -> fn(metadata, e as E1) }
}


@JvmName("subscribeAnyOf")
inline fun <reified E1 : E, reified E2 : E> subscribe(subscriptionId: String, startAt: StartAt? = null, crossinline fn: (E) -> Unit): Subscription {
return subscribe(subscriptionId, E1::class, E2::class, startAt = startAt) { e -> fn(e) }
return subscribe(subscriptionId, E1::class, E2::class, startAt = startAt) { _, e -> fn(e) }
}

@JvmName("subscribeAnyOf")
inline fun <reified E1 : E, reified E2 : E> subscribe(subscriptionId: String, startAt: StartAt? = null, crossinline fn: (EventMetadata, E) -> Unit): Subscription {
return subscribe(subscriptionId, E1::class, E2::class, startAt = startAt) { metadata, e -> fn(metadata, e) }
}

@JvmOverloads
fun <E1 : E> subscribe(subscriptionId: String, eventType: Class<E>, startAt: StartAt? = null, fn: Consumer<E1>): Subscription {
return subscribe(subscriptionId, listOf(eventType), startAt) { e : E ->
fn.accept(e as E1)
}
}

@JvmOverloads
fun <E1 : E> subscribe(subscriptionId: String, eventType: Class<E>, startAt: StartAt? = null, fn: Consumer<E>): Subscription {
return subscribe(subscriptionId, listOf(eventType), startAt) { e ->
fn.accept(e)
fun <E1 : E> subscribe(subscriptionId: String, eventType: Class<E>, startAt: StartAt? = null, fn: BiConsumer<EventMetadata, E>): Subscription {
return subscribe(subscriptionId, listOf(eventType), startAt) { metadata, e ->
fn.accept(metadata, e)
}
}

Expand All @@ -83,20 +122,31 @@ class Subscriptions<E : Any>(private val subscriptionModel: Subscribable, privat
return subscribe(subscriptionId, *eventTypes.map { c -> c.kotlin }.toTypedArray(), startAt = startAt) { e -> fn.accept(e) }
}

@JvmOverloads
fun subscribe(subscriptionId: String, eventTypes: List<Class<out E>>, startAt: StartAt? = null, fn: BiConsumer<EventMetadata, E>): Subscription {
return subscribe(subscriptionId, *eventTypes.map { c -> c.kotlin }.toTypedArray(), startAt = startAt) { metadata, e -> fn.accept(metadata, e) }
}

fun subscribe(subscriptionId: String, vararg eventTypes: KClass<out E>, startAt: StartAt? = null, fn: (E) -> Unit): Subscription {
val condition = when {
eventTypes.isEmpty() -> null
eventTypes.size == 1 -> Condition.eq(cloudEventConverter[eventTypes[0]])
else -> Condition.or(eventTypes.map { e -> Condition.eq(cloudEventConverter[e]) })
}
val filter = OccurrentSubscriptionFilter.filter(if (condition == null) Filter.all() else Filter.type(condition))
val filter = subscriptionFilterFromEventTypes(eventTypes)
return subscribe(subscriptionId, filter, startAt, fn)
}

fun subscribe(subscriptionId: String, vararg eventTypes: KClass<out E>, startAt: StartAt? = null, fn: (EventMetadata, E) -> Unit): Subscription {
val filter = subscriptionFilterFromEventTypes(eventTypes)
return subscribe(subscriptionId, filter, startAt, fn)
}

fun subscribe(subscriptionId: String, filter: OccurrentSubscriptionFilter = OccurrentSubscriptionFilter.filter(Filter.all()), startAt: StartAt? = null, fn: (E) -> Unit): Subscription {
return subscribe(subscriptionId, filter, startAt) { _, e -> fn(e) }
}

fun subscribe(subscriptionId: String, filter: OccurrentSubscriptionFilter = OccurrentSubscriptionFilter.filter(Filter.all()), startAt: StartAt? = null, fn: (EventMetadata, E) -> Unit): Subscription {
val consumer: (CloudEvent) -> Unit = { cloudEvent ->
val event = cloudEventConverter[cloudEvent]
fn(event)
val metadataMap = cloudEvent.extensionNames.associateWith { extensionName -> cloudEvent.getExtension(extensionName) }
val eventMetadata = EventMetadata(metadataMap)
fn(eventMetadata, event)
}

val subscription = if (startAt == null) {
Expand All @@ -109,4 +159,14 @@ class Subscriptions<E : Any>(private val subscriptionModel: Subscribable, privat
waitUntilStarted()
}
}

private fun subscriptionFilterFromEventTypes(eventTypes: Array<out KClass<out E>>): OccurrentSubscriptionFilter {
val condition = when {
eventTypes.isEmpty() -> null
eventTypes.size == 1 -> Condition.eq(cloudEventConverter[eventTypes[0]])
else -> Condition.or(eventTypes.map { e -> Condition.eq(cloudEventConverter[e]) })
}
val filter = OccurrentSubscriptionFilter.filter(if (condition == null) Filter.all() else Filter.type(condition))
return filter
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,50 @@

package org.occurrent.dsl.view

import org.occurrent.dsl.subscription.blocking.EventMetadata
import org.occurrent.dsl.subscription.blocking.Subscriptions
import org.occurrent.subscription.StartAt
import org.occurrent.subscription.api.blocking.Subscription

inline fun <reified E : Any> Subscriptions<E>.updateView(viewName: String, startAt: StartAt? = null, crossinline updateFunction: (E) -> Unit): Subscription {
inline fun <reified E : Any> Subscriptions<E>.updateView(viewName: String, startAt: StartAt? = null, crossinline updateFunction: (EventMetadata, E) -> Unit): Subscription {
val eventTypes: List<Class<out E>> = if (E::class.isSealed) {
E::class.sealedSubclasses.map { it.java }.toList()
} else {
listOf(E::class.java)
}
return subscribe(viewName, eventTypes = eventTypes, startAt = startAt, fn = { e ->
updateFunction(e)
return subscribe(viewName, eventTypes = eventTypes, startAt = startAt, fn = { metadata, e ->
updateFunction(metadata, e)
})
}

inline fun <reified E : Any> Subscriptions<E>.updateView(viewName: String, startAt: StartAt? = null, crossinline updateFunction: (E) -> Unit): Subscription =
updateView(viewName, startAt) { _, e -> updateFunction(e) }

inline fun <reified E : Any> Subscriptions<E>.updateView(
viewName: String, materializedView: MaterializedView<E>, startAt: StartAt? = null,
crossinline doBeforeUpdate: (E) -> Unit = {},
crossinline doAfterUpdate: (E) -> Unit = {}
): Subscription =
updateView(viewName, startAt) { e ->
doBeforeUpdate(e)
materializedView.update(e)
doAfterUpdate(e)
updateView(
viewName,
converter = { _, e -> e },
startAt = startAt,
materializedView = materializedView,
doBeforeUpdate = doBeforeUpdate,
doAfterUpdate = doAfterUpdate
)

inline fun <reified E : Any, reified E2 : Any> Subscriptions<E>.updateView(
viewName: String,
crossinline converter: (EventMetadata, E) -> E2,
materializedView: MaterializedView<E2>,
startAt: StartAt? = null,
crossinline doBeforeUpdate: (E2) -> Unit = { },
crossinline doAfterUpdate: (E2) -> Unit = { }
): Subscription =
updateView(viewName, startAt) { metadata, e ->
val e2 = converter(metadata, e)
doBeforeUpdate(e2)
materializedView.update(e2)
doAfterUpdate(e2)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.occurrent.eventstore.api.blocking

import io.cloudevents.CloudEvent
import org.occurrent.eventstore.api.SortBy
import org.occurrent.eventstore.api.SortBy.SortDirection.ASCENDING
import org.occurrent.filter.Filter
import kotlin.streams.asSequence

Expand All @@ -30,5 +29,5 @@ fun EventStoreQueries.queryForSequence(
filter: Filter = Filter.all(),
skip: Int = 0,
limit: Int = Int.MAX_VALUE,
sortBy: SortBy = SortBy.natural(ASCENDING)
sortBy: SortBy = TODO()
): Sequence<CloudEvent> = query(filter, skip, limit, sortBy).asSequence()
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class GamePlayController(private val applicationService: ApplicationService<Game
fun playGame(@PathVariable("gameId") gameId: GameId, @RequestParam playerId: PlayerId, @RequestParam handGesture: HandGesture): ResponseEntity<Unit> {
log.info("Playing game (gameId=$gameId)")
val cmd = MakeHandGesture(gameId, Timestamp.now(), playerId, handGesture)
applicationService.execute(gameId, cmd, rps)
return ResponseEntity.noContent().header("Location", "/games/$gameId").build()
val writeResult = applicationService.execute(gameId, cmd, rps)
return ResponseEntity.noContent().header("Location", "/games/$gameId?version=${writeResult.newStreamVersion}").build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.occurrent.example.domain.rps.decidermodel.web.cqrs.gameplay

import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL
import org.occurrent.dsl.subscription.blocking.EventMetadata
import org.occurrent.dsl.subscription.blocking.Subscriptions
import org.occurrent.dsl.view.materialized
import org.occurrent.dsl.view.updateView
Expand Down Expand Up @@ -50,22 +51,23 @@ sealed interface GameReadModel {
@get:Id
val gameId: GameId
val status: GameStatus
val streamVersion: Long

@TypeAlias("Initialized")
data class Initialized(override val gameId: GameId, val initializedBy: PlayerId, override val status: GameStatus) : GameReadModel
data class Initialized(override val gameId: GameId, val initializedBy: PlayerId, override val status: GameStatus, override val streamVersion: Long) : GameReadModel

@TypeAlias("Ongoing")
data class Ongoing(override val gameId: GameId, val firstMove: Move, val secondMove: Move? = null, override val status: GameStatus) : GameReadModel
data class Ongoing(override val gameId: GameId, val firstMove: Move, val secondMove: Move? = null, override val status: GameStatus, override val streamVersion: Long) : GameReadModel

sealed interface Ended : GameReadModel {
val firstMove: Move
val secondMove: Move

@TypeAlias("Tied")
data class Tied(override val gameId: GameId, override val firstMove: Move, override val secondMove: Move, override val status: GameStatus) : Ended
data class Tied(override val gameId: GameId, override val firstMove: Move, override val secondMove: Move, override val status: GameStatus, override val streamVersion: Long) : Ended

@TypeAlias("Won")
data class Won(override val gameId: GameId, override val firstMove: Move, override val secondMove: Move, val winner: PlayerId, override val status: GameStatus) : Ended
data class Won(override val gameId: GameId, override val firstMove: Move, override val secondMove: Move, val winner: PlayerId, override val status: GameStatus, override val streamVersion: Long) : Ended
}
}

Expand All @@ -77,27 +79,31 @@ class GameViewController(private val mongoOperations: MongoOperations) {
fun showGame(@PathVariable("gameId") gameId: GameId): GameReadModel? = mongoOperations.findById(gameId)
}

private val gameView = view<GameReadModel?, GameEvent>(
private val gameView = view<GameReadModel?, Pair<EventMetadata, GameEvent>>(
initialState = null,
updateState = { game, e ->
updateState = { game, (metadata, e) ->
when (e) {
is NewGameInitiated -> GameReadModel.Initialized(e.gameId, e.playerId, Initialized)
is NewGameInitiated -> GameReadModel.Initialized(e.gameId, e.playerId, Initialized, metadata.streamVersion)
is GameStarted -> game
is HandGestureShown -> when (game) {
is GameReadModel.Initialized -> GameReadModel.Ongoing(e.gameId, firstMove = Move(e.player, e.gesture), status = Ongoing)
is GameReadModel.Initialized -> GameReadModel.Ongoing(
e.gameId, firstMove = Move(e.player, e.gesture),
status = Ongoing, streamVersion = metadata.streamVersion
)

is GameReadModel.Ongoing -> game.copy(secondMove = Move(e.player, e.gesture))
else -> game
}

is GameEnded -> game
is GameTied -> {
val ongoingGame = game as GameReadModel.Ongoing
GameReadModel.Ended.Tied(e.gameId, ongoingGame.firstMove, ongoingGame.secondMove!!, Tied)
GameReadModel.Ended.Tied(e.gameId, ongoingGame.firstMove, ongoingGame.secondMove!!, Tied, streamVersion = metadata.streamVersion)
}

is GameWon -> {
val ongoingGame = game as GameReadModel.Ongoing
GameReadModel.Ended.Won(e.gameId, ongoingGame.firstMove, ongoingGame.secondMove!!, e.winner, Won)
GameReadModel.Ended.Won(e.gameId, ongoingGame.firstMove, ongoingGame.secondMove!!, e.winner, Won, streamVersion = metadata.streamVersion)
}
}
}
Expand All @@ -110,8 +116,11 @@ private class UpdateGameViewWhenGamePlayed(subscriptions: Subscriptions<GameEven
init {
subscriptions.updateView(
viewName = "gameView",
materializedView = gameView.materialized(mongoOperations) { e -> e.gameId }, doBeforeUpdate = { e ->
converter = { eventMetadata, gameEvent ->
eventMetadata to gameEvent
},
materializedView = gameView.materialized(mongoOperations) { (_, e) -> e.gameId }, doBeforeUpdate = { (_, e) ->
log.info("Updating game view for game ${e.gameId} based on ${e::class.simpleName}")
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger
class ViewDemo {

@Test
fun `simple inmemory view`() {
fun `simple in-memory view`() {
// Given
val subscriptionModel = InMemorySubscriptionModel(RetryStrategy.none())
val inMemoryEventStore = InMemoryEventStore(subscriptionModel)
Expand All @@ -43,7 +43,7 @@ class ViewDemo {

val numberOfStartedGames = AtomicInteger()
subscriptions(subscriptionModel, cloudEventConverter) {
subscribe<GameCreated> {
subscribe<GameCreated> { _ ->
numberOfStartedGames.incrementAndGet()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class OccurrentModule<C : Any, E : Any, Q : Query<Any>>(

init {
handlers.subscriptionHandlers.forEach { subscriptionHandler ->
subscriptionDSL.subscribe(
subscriptionDSL.subscribe<E>(
subscriptionId = subscriptionHandler.type.simpleName!!,
eventType = subscriptionHandler.type.java
) { e ->
Expand Down

0 comments on commit 7db40d6

Please sign in to comment.