From ac58a8cfaab3f0bd4f32d9fc7397afd163385426 Mon Sep 17 00:00:00 2001 From: d1snin Date: Fri, 27 Sep 2024 02:25:26 +0300 Subject: [PATCH] feat: add per-connection event filter abstraction --- .../d1s/ktor/events/server/EventProcessor.kt | 48 +++++++++---------- .../ktor/events/server/OutgoingEventFilter.kt | 18 +++++++ .../events/server/WebSocketEventsPlugin.kt | 4 ++ .../events/server/dto/WebSocketEventDto.kt | 5 +- .../server/entity/EventSendingConnection.kt | 9 ++-- .../server/route/WebSocketEventsRoute.kt | 2 +- .../events/server/util/InternalComponents.kt | 11 +++++ 7 files changed, 68 insertions(+), 29 deletions(-) create mode 100644 ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/OutgoingEventFilter.kt diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/EventProcessor.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/EventProcessor.kt index b4859de..9449bbc 100644 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/EventProcessor.kt +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/EventProcessor.kt @@ -21,8 +21,8 @@ import dev.d1s.ktor.events.server.dto.WebSocketEventDto import dev.d1s.ktor.events.server.entity.EventSendingConnection import dev.d1s.ktor.events.server.entity.ServerWebSocketEvent import dev.d1s.ktor.events.server.pool.EventPool -import dev.d1s.ktor.events.server.util.clientId import dev.d1s.ktor.events.server.util.eventPool +import dev.d1s.ktor.events.server.util.filter import io.ktor.server.websocket.* import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DelicateCoroutinesApi @@ -46,7 +46,8 @@ internal class DefaultEventProcessor : EventProcessor { val reference = connection.reference val call = connection.call val pool = call.application.attributes.eventPool - val client = call.clientId + val filter = call.application.attributes.filter + val client = connection.clientId log.d { "Processing events for reference: $reference" @@ -58,30 +59,32 @@ internal class DefaultEventProcessor : EventProcessor { "Sending previously unreceived events: $unreceived" } - connection.sendAll(pool, client, unreceived) + connection.sendAll(pool, filter, client, unreceived) pool.onEvent(reference, client) { log.d { "Got event from event channel: $it" } - connection.sendEvent(pool, client, it) + connection.sendEvent(pool, filter, client, it) } } } private suspend fun EventSendingConnection.sendAll( pool: EventPool, + filter: OutgoingEventFilter?, client: Identifier, events: List ) { events.forEach { - sendEvent(pool, client, it) + sendEvent(pool, filter, client, it) } } private suspend fun EventSendingConnection.sendEvent( pool: EventPool, + filter: OutgoingEventFilter?, client: Identifier, event: ServerWebSocketEvent ) { @@ -90,39 +93,36 @@ internal class DefaultEventProcessor : EventProcessor { } (session as? WebSocketServerSession)?.let { session -> - processSession(pool, client, session, event) + processEvent(pool, filter, client, session, event) } } @OptIn(DelicateCoroutinesApi::class) - private suspend fun EventSendingConnection.processSession( + private suspend fun EventSendingConnection.processEvent( pool: EventPool, + filter: OutgoingEventFilter?, client: Identifier, session: WebSocketServerSession, event: ServerWebSocketEvent ) { if (!session.outgoing.isClosedForSend) { - sendEventDto(session, event) - - pool.confirm(event.id, client) + val dto = WebSocketEventDto( + id = event.id, + reference = reference, + initiated = event.initiated, + data = event.dataSupplier(reference.parameters) + ) + + val filterResolution = filter?.predicate(event, dto, this) ?: true + + if (filterResolution) { + session.sendSerialized(dto) + pool.confirm(event.id, client) + } } else { log.d { "Couldn't send event. Connection is closed." } } } - - private suspend fun EventSendingConnection.sendEventDto( - session: WebSocketServerSession, - event: ServerWebSocketEvent - ) { - val dto = WebSocketEventDto( - id = event.id, - reference = reference, - initiated = event.initiated, - data = event.dataSupplier(reference.parameters) - ) - - session.sendSerialized(dto) - } } \ No newline at end of file diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/OutgoingEventFilter.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/OutgoingEventFilter.kt new file mode 100644 index 0000000..f56ab30 --- /dev/null +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/OutgoingEventFilter.kt @@ -0,0 +1,18 @@ +package dev.d1s.ktor.events.server + +import dev.d1s.ktor.events.server.dto.WebSocketEventDto +import dev.d1s.ktor.events.server.entity.EventSendingConnection +import dev.d1s.ktor.events.server.entity.ServerWebSocketEvent + +/** + * Final filter for outgoing events. + * [WebSocketEventDto] is sent to client only when this [predicate] returns true. Otherwise, the event is ignored for current connection. + */ +public fun interface OutgoingEventFilter { + + public fun predicate( + event: ServerWebSocketEvent, + dto: WebSocketEventDto, + connection: EventSendingConnection + ): Boolean +} \ No newline at end of file diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventsPlugin.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventsPlugin.kt index 7459651..971cddc 100644 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventsPlugin.kt +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventsPlugin.kt @@ -19,6 +19,7 @@ package dev.d1s.ktor.events.server import dev.d1s.ktor.events.server.pool.EventPool import dev.d1s.ktor.events.server.util.eventPool import dev.d1s.ktor.events.server.util.eventProcessor +import dev.d1s.ktor.events.server.util.filter import io.ktor.serialization.jackson.* import io.ktor.server.application.* import io.ktor.server.websocket.* @@ -65,11 +66,14 @@ public val WebSocketEvents: ApplicationPlugin = application.attributes.eventProcessor = DefaultEventProcessor() application.attributes.eventPool = pluginConfig.eventPool ?: error("Event pool must be specified") + application.attributes.filter = pluginConfig.filter } public class WebSocketEventsConfiguration { public var eventPool: EventPool? = null + + public var filter: OutgoingEventFilter? = null } private fun Application.hasWebSocketsPlugin() = pluginOrNull(WebSockets) != null diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/dto/WebSocketEventDto.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/dto/WebSocketEventDto.kt index a991dac..c344bf4 100644 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/dto/WebSocketEventDto.kt +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/dto/WebSocketEventDto.kt @@ -5,7 +5,10 @@ import dev.d1s.ktor.events.commons.EventReference import dev.d1s.ktor.events.commons.Identifier import dev.d1s.ktor.events.commons.UnixTime -internal data class WebSocketEventDto( +/** + * Outgoing event data + */ +public data class WebSocketEventDto( override val id: Identifier, override val reference: EventReference, override val initiated: UnixTime, diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/entity/EventSendingConnection.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/entity/EventSendingConnection.kt index d6507c6..8b2ac2d 100644 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/entity/EventSendingConnection.kt +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/entity/EventSendingConnection.kt @@ -17,11 +17,14 @@ package dev.d1s.ktor.events.server.entity import dev.d1s.ktor.events.commons.EventReference +import dev.d1s.ktor.events.commons.Identifier +import dev.d1s.ktor.events.server.util.clientId import io.ktor.server.application.* -import io.ktor.websocket.* +import io.ktor.server.websocket.* public data class EventSendingConnection( val reference: EventReference, - val session: DefaultWebSocketSession, - val call: ApplicationCall + val session: DefaultWebSocketServerSession, + val call: ApplicationCall = session.call, + val clientId: Identifier = call.clientId ) diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/route/WebSocketEventsRoute.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/route/WebSocketEventsRoute.kt index 54ceb8a..9f49dc6 100644 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/route/WebSocketEventsRoute.kt +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/route/WebSocketEventsRoute.kt @@ -84,7 +84,7 @@ public fun Route.webSocketEvents( preprocess(eventReference) - val connection = EventSendingConnection(eventReference, this, call) + val connection = EventSendingConnection(eventReference, this) processor.process(connection) receive() diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/util/InternalComponents.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/util/InternalComponents.kt index b9f6224..d048c21 100644 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/util/InternalComponents.kt +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/util/InternalComponents.kt @@ -17,6 +17,7 @@ package dev.d1s.ktor.events.server.util import dev.d1s.ktor.events.server.EventProcessor +import dev.d1s.ktor.events.server.OutgoingEventFilter import dev.d1s.ktor.events.server.WEBSOCKET_EVENTS_PLUGIN_NAME import dev.d1s.ktor.events.server.pool.EventPool import io.ktor.util.* @@ -29,10 +30,20 @@ internal var Attributes.eventPool: EventPool get() = this[Key.EventPool] set(value) = this.put(Key.EventPool, value) +internal var Attributes.filter: OutgoingEventFilter? + get() = this[Key.Filter] + set(value) { + value?.let { + this.put(Key.Filter, it) + } + } + private object Key { val EventProcessor = AttributeKey("${WEBSOCKET_EVENTS_PLUGIN_NAME}_event-processor") val EventPool = AttributeKey("${WEBSOCKET_EVENTS_PLUGIN_NAME}_event-pool") + + val Filter = AttributeKey("${WEBSOCKET_EVENTS_PLUGIN_NAME}_filter") } \ No newline at end of file