Skip to content

Commit

Permalink
feat: add per-connection event filter abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
d1snin committed Sep 26, 2024
1 parent a088427 commit ac58a8c
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import dev.d1s.ktor.events.server.dto.WebSocketEventDto
import dev.d1s.ktor.events.server.entity.EventSendingConnection
import dev.d1s.ktor.events.server.entity.ServerWebSocketEvent
import dev.d1s.ktor.events.server.pool.EventPool
import dev.d1s.ktor.events.server.util.clientId
import dev.d1s.ktor.events.server.util.eventPool
import dev.d1s.ktor.events.server.util.filter
import io.ktor.server.websocket.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
Expand All @@ -46,7 +46,8 @@ internal class DefaultEventProcessor : EventProcessor {
val reference = connection.reference
val call = connection.call
val pool = call.application.attributes.eventPool
val client = call.clientId
val filter = call.application.attributes.filter
val client = connection.clientId

log.d {
"Processing events for reference: $reference"
Expand All @@ -58,30 +59,32 @@ internal class DefaultEventProcessor : EventProcessor {
"Sending previously unreceived events: $unreceived"
}

connection.sendAll(pool, client, unreceived)
connection.sendAll(pool, filter, client, unreceived)

pool.onEvent(reference, client) {
log.d {
"Got event from event channel: $it"
}

connection.sendEvent(pool, client, it)
connection.sendEvent(pool, filter, client, it)
}
}
}

private suspend fun EventSendingConnection.sendAll(
pool: EventPool,
filter: OutgoingEventFilter?,
client: Identifier,
events: List<ServerWebSocketEvent>
) {
events.forEach {
sendEvent(pool, client, it)
sendEvent(pool, filter, client, it)
}
}

private suspend fun EventSendingConnection.sendEvent(
pool: EventPool,
filter: OutgoingEventFilter?,
client: Identifier,
event: ServerWebSocketEvent
) {
Expand All @@ -90,39 +93,36 @@ internal class DefaultEventProcessor : EventProcessor {
}

(session as? WebSocketServerSession)?.let { session ->
processSession(pool, client, session, event)
processEvent(pool, filter, client, session, event)
}
}

@OptIn(DelicateCoroutinesApi::class)
private suspend fun EventSendingConnection.processSession(
private suspend fun EventSendingConnection.processEvent(
pool: EventPool,
filter: OutgoingEventFilter?,
client: Identifier,
session: WebSocketServerSession,
event: ServerWebSocketEvent
) {
if (!session.outgoing.isClosedForSend) {
sendEventDto(session, event)

pool.confirm(event.id, client)
val dto = WebSocketEventDto(
id = event.id,
reference = reference,
initiated = event.initiated,
data = event.dataSupplier(reference.parameters)
)

val filterResolution = filter?.predicate(event, dto, this) ?: true

if (filterResolution) {
session.sendSerialized(dto)
pool.confirm(event.id, client)
}
} else {
log.d {
"Couldn't send event. Connection is closed."
}
}
}

private suspend fun EventSendingConnection.sendEventDto(
session: WebSocketServerSession,
event: ServerWebSocketEvent
) {
val dto = WebSocketEventDto(
id = event.id,
reference = reference,
initiated = event.initiated,
data = event.dataSupplier(reference.parameters)
)

session.sendSerialized(dto)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package dev.d1s.ktor.events.server

import dev.d1s.ktor.events.server.dto.WebSocketEventDto
import dev.d1s.ktor.events.server.entity.EventSendingConnection
import dev.d1s.ktor.events.server.entity.ServerWebSocketEvent

/**
* Final filter for outgoing events.
* [WebSocketEventDto] is sent to client only when this [predicate] returns true. Otherwise, the event is ignored for current connection.
*/
public fun interface OutgoingEventFilter {

public fun predicate(
event: ServerWebSocketEvent,
dto: WebSocketEventDto,
connection: EventSendingConnection
): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dev.d1s.ktor.events.server
import dev.d1s.ktor.events.server.pool.EventPool
import dev.d1s.ktor.events.server.util.eventPool
import dev.d1s.ktor.events.server.util.eventProcessor
import dev.d1s.ktor.events.server.util.filter
import io.ktor.serialization.jackson.*
import io.ktor.server.application.*
import io.ktor.server.websocket.*
Expand Down Expand Up @@ -65,11 +66,14 @@ public val WebSocketEvents: ApplicationPlugin<WebSocketEventsConfiguration> =

application.attributes.eventProcessor = DefaultEventProcessor()
application.attributes.eventPool = pluginConfig.eventPool ?: error("Event pool must be specified")
application.attributes.filter = pluginConfig.filter
}

public class WebSocketEventsConfiguration {

public var eventPool: EventPool? = null

public var filter: OutgoingEventFilter? = null
}

private fun Application.hasWebSocketsPlugin() = pluginOrNull(WebSockets) != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import dev.d1s.ktor.events.commons.EventReference
import dev.d1s.ktor.events.commons.Identifier
import dev.d1s.ktor.events.commons.UnixTime

internal data class WebSocketEventDto(
/**
* Outgoing event data
*/
public data class WebSocketEventDto(
override val id: Identifier,
override val reference: EventReference,
override val initiated: UnixTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package dev.d1s.ktor.events.server.entity

import dev.d1s.ktor.events.commons.EventReference
import dev.d1s.ktor.events.commons.Identifier
import dev.d1s.ktor.events.server.util.clientId
import io.ktor.server.application.*
import io.ktor.websocket.*
import io.ktor.server.websocket.*

public data class EventSendingConnection(
val reference: EventReference,
val session: DefaultWebSocketSession,
val call: ApplicationCall
val session: DefaultWebSocketServerSession,
val call: ApplicationCall = session.call,
val clientId: Identifier = call.clientId
)
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public fun Route.webSocketEvents(

preprocess(eventReference)

val connection = EventSendingConnection(eventReference, this, call)
val connection = EventSendingConnection(eventReference, this)
processor.process(connection)

receive()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package dev.d1s.ktor.events.server.util

import dev.d1s.ktor.events.server.EventProcessor
import dev.d1s.ktor.events.server.OutgoingEventFilter
import dev.d1s.ktor.events.server.WEBSOCKET_EVENTS_PLUGIN_NAME
import dev.d1s.ktor.events.server.pool.EventPool
import io.ktor.util.*
Expand All @@ -29,10 +30,20 @@ internal var Attributes.eventPool: EventPool
get() = this[Key.EventPool]
set(value) = this.put(Key.EventPool, value)

internal var Attributes.filter: OutgoingEventFilter?
get() = this[Key.Filter]
set(value) {
value?.let {
this.put(Key.Filter, it)
}
}

private object Key {

val EventProcessor =
AttributeKey<EventProcessor>("${WEBSOCKET_EVENTS_PLUGIN_NAME}_event-processor")

val EventPool = AttributeKey<EventPool>("${WEBSOCKET_EVENTS_PLUGIN_NAME}_event-pool")

val Filter = AttributeKey<OutgoingEventFilter>("${WEBSOCKET_EVENTS_PLUGIN_NAME}_filter")
}

0 comments on commit ac58a8c

Please sign in to comment.