Skip to content

Commit

Permalink
Merge pull request #72 from Crusader99/dev
Browse files Browse the repository at this point in the history
Bug fixes for file selector, race conditions, out of screen borders
  • Loading branch information
Crusader99 authored Jul 30, 2021
2 parents 756bf98 + 31f6080 commit cda70ec
Show file tree
Hide file tree
Showing 52 changed files with 571 additions and 550 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package de.hsaalen.cmt.repository

import com.mongodb.client.model.PushOptions
import de.hsaalen.cmt.events.GlobalEventDispatcher
import de.hsaalen.cmt.events.UserDocumentChangeEvent
import de.hsaalen.cmt.events.server.UserDocumentChangeEvent
import de.hsaalen.cmt.mongo.MongoDB
import de.hsaalen.cmt.mongo.TextDocument
import de.hsaalen.cmt.network.dto.objects.LineChangeMode.*
import de.hsaalen.cmt.network.dto.objects.UUID
import de.hsaalen.cmt.network.dto.websocket.DocumentChangeDto
import de.hsaalen.cmt.network.dto.rsocket.DocumentChangeDto
import de.hsaalen.cmt.session.currentSession
import de.hsaalen.cmt.session.senderSocketId
import de.hsaalen.cmt.sql.schema.ReferenceDao
import de.hsaalen.cmt.utils.id
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
import org.litote.kmongo.*

/**
Expand All @@ -22,7 +25,12 @@ internal object DocumentRepositoryImpl : DocumentRepository {
*/
override suspend fun modifyDocument(request: DocumentChangeDto) {
val c = MongoDB.collection ?: return
val id = request.uuid // TODO: Ensure user has edit permissions for that document
val id = request.uuid

// Ensure user has permissions to access this document
checkAccess(currentSession.userMail, id)

// Modify document in MongoDB
val newLine = request.lineContent
val allLines = TextDocument::lines
val targetLine = allLines.colProperty.memberWithAdditionalPath(request.lineNumber.toString())
Expand All @@ -35,13 +43,31 @@ internal object DocumentRepositoryImpl : DocumentRepository {
}
}

// Notify event listeners
val event = UserDocumentChangeEvent(request, currentSession.userMail, currentSession.senderSocketId)
GlobalEventDispatcher.notify(event)
}

/**
* Download the content of a specific reference by uuid.
*/
override suspend fun downloadContent(uuid: UUID): String = MongoDB.getDocumentContent(uuid.value)
override suspend fun downloadContent(uuid: UUID): String {
// Ensure user has permissions to access this document
checkAccess(currentSession.userMail, uuid)

// Read document content from MongoDB
return MongoDB.getDocumentContent(uuid.value)
}

/**
* Ensure user has permissions to access the given reference content. Will throw an exception
* when user has no permissions to access the reference.
*/
private suspend fun checkAccess(userMail: String, reference: UUID) {
newSuspendedTransaction {
val ref = ReferenceDao.findById(reference.id) ?: error("Reference not found: $reference")
check(ref.owner.email == userMail) { "No permissions to access document" }
}
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package de.hsaalen.cmt.repository

import de.hsaalen.cmt.events.GlobalEventDispatcher
import de.hsaalen.cmt.events.LabelChangeEvent
import de.hsaalen.cmt.events.server.LabelChangeEvent
import de.hsaalen.cmt.network.dto.objects.LabelChangeMode
import de.hsaalen.cmt.network.dto.objects.UUID
import de.hsaalen.cmt.network.dto.websocket.LabelUpdateDto
import de.hsaalen.cmt.network.dto.rsocket.LabelUpdateDto
import de.hsaalen.cmt.session.currentSession
import de.hsaalen.cmt.sql.schema.*
import de.hsaalen.cmt.utils.id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import de.hsaalen.cmt.network.dto.objects.ContentType
import de.hsaalen.cmt.network.dto.objects.Reference
import de.hsaalen.cmt.network.dto.objects.UUID
import de.hsaalen.cmt.network.dto.server.ServerReferenceListDto
import de.hsaalen.cmt.network.dto.websocket.ReferenceUpdateAddDto
import de.hsaalen.cmt.network.dto.websocket.ReferenceUpdateRemoveDto
import de.hsaalen.cmt.network.dto.rsocket.ReferenceUpdateAddDto
import de.hsaalen.cmt.network.dto.rsocket.ReferenceUpdateRemoveDto
import de.hsaalen.cmt.session.currentSession
import de.hsaalen.cmt.sql.schema.ReferenceDao
import de.hsaalen.cmt.sql.schema.ReferenceTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ object ReferenceTable : UUIDTable("reference") {
val displayName = varchar("display_name", 512)
val contentType = enumeration("content_type", ContentType::class)
val owner = reference("owner", UserTable, onDelete = ReferenceOption.CASCADE)
// val latestRevision = reference("latest_revision", RevisionTable, onDelete = ReferenceOption.RESTRICT)
// TODO fix initialization problem
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import de.hsaalen.cmt.network.dto.client.ClientCreateReferenceDto
import de.hsaalen.cmt.network.dto.objects.ContentType
import de.hsaalen.cmt.network.dto.objects.LineChangeMode.*
import de.hsaalen.cmt.network.dto.objects.Reference
import de.hsaalen.cmt.network.dto.websocket.DocumentChangeDto
import de.hsaalen.cmt.network.dto.rsocket.DocumentChangeDto
import de.hsaalen.cmt.repository.DocumentRepositoryImpl
import de.hsaalen.cmt.repository.ReferenceRepositoryImpl
import org.junit.jupiter.api.BeforeAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ val SessionContext.senderSocketId: String
/**
* Build a websocket session context for the current coroutine.
*/
suspend inline fun withWebSocketSession(userEmail: String, socketId: String, crossinline block: suspend () -> Unit) {
val context = WebSocketContextImpl(userEmail, socketId)
withContext(coroutineContext + SessionContext.threadLocal.asContextElement(context)) {
suspend inline fun <R> withWebSocketSession(userMail: String, socketId: String, crossinline block: suspend () -> R): R {
val context = WebSocketContextImpl(userMail, socketId)
return withContext(coroutineContext + SessionContext.threadLocal.asContextElement(context)) {
block()
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
package de.hsaalen.cmt.events

import de.hsaalen.cmt.network.dto.websocket.ReferenceUpdateEvent
import de.hsaalen.cmt.events.server.LabelChangeEvent
import de.hsaalen.cmt.events.server.SessionCloseEvent
import de.hsaalen.cmt.network.dto.rsocket.ReferenceUpdateEvent
import de.hsaalen.cmt.rsocket.WebSocketManager
import mu.KotlinLogging

/**
* Contains handlers for events that should be synchronized over different websocket instances.
*/
object EventHandlers {

/**
* Local logger instance for this object.
*/
private val logger = KotlinLogging.logger { }

/**
* Initialize required event handlers for synchronization.
*/
fun init() {
GlobalEventDispatcher.createBundle(this) {
register(::handleReferenceUpdate)
register(::handleDocumentChange)
register(::handleLabelChange)
register(::handleSessionClose)
}
}

Expand All @@ -27,17 +35,21 @@ object EventHandlers {
}

/**
* Invoked when user modified lines of a text document.
* Invoked when user adds/removed labels.
*/
private suspend fun handleDocumentChange(event: UserDocumentChangeEvent) {
WebSocketManager.broadcastExcept(event.senderSocketId, event.modification)
private suspend fun handleLabelChange(event: LabelChangeEvent) {
WebSocketManager.broadcast(event.modification)
}

/**
* Invoked when user adds/removed labels.
* Called when user performs logout.
*/
private suspend fun handleLabelChange(event: LabelChangeEvent) {
WebSocketManager.broadcast(event.modification)
private suspend fun handleSessionClose(event: SessionCloseEvent) {
try {
WebSocketManager.disconnect(event.jwtToken)
} catch (ex: Exception) {
logger.warn("Unable to disconnect websockets related to session", ex)
}
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package de.hsaalen.cmt.rest.routes

import de.hsaalen.cmt.crypto.fromBase64
import de.hsaalen.cmt.events.GlobalEventDispatcher
import de.hsaalen.cmt.events.server.SessionCloseEvent
import de.hsaalen.cmt.network.*
import de.hsaalen.cmt.network.dto.client.ClientLoginDto
import de.hsaalen.cmt.network.dto.client.ClientRegisterDto
import de.hsaalen.cmt.repository.AuthenticationRepository
import de.hsaalen.cmt.rsocket.WebSocketManager
import de.hsaalen.cmt.session.currentSession
import de.hsaalen.cmt.session.getWithSession
import de.hsaalen.cmt.session.jwt.JwtCookie
import de.hsaalen.cmt.session.jwt.generateJwtToken
import de.hsaalen.cmt.session.jwt.readJwtCookie
import de.hsaalen.cmt.session.jwt.updateJwtCookie
import de.hsaalen.cmt.session.postWithSession
import io.ktor.application.*
import io.ktor.request.*
import io.ktor.response.*
Expand All @@ -37,6 +40,7 @@ fun Routing.routeAuthentication() = route("/" + RestPaths.base) {
call.response.updateJwtCookie(user.generateJwtToken())
call.respond(user)
}

post(apiPathAuthRegister) {
val request: ClientRegisterDto = call.receive()
logger.info("Register new account with e-mail= " + request.email)
Expand All @@ -45,23 +49,20 @@ fun Routing.routeAuthentication() = route("/" + RestPaths.base) {
call.response.updateJwtCookie(user.generateJwtToken())
call.respond(user)
}
post(apiPathAuthLogout) {

postWithSession(apiPathAuthLogout) { // Automatically checks if authorization cookie is valid
// Disconnect currently connected web sockets
val jwtToken = call.request.cookies[JwtCookie.cookieName]
if (jwtToken != null) {
try {
JwtCookie.verifyToken(jwtToken) // Throws exception when JWT token invalid
WebSocketManager.disconnect(jwtToken) // Disconnect all web-sockets from this session
} catch (ex: Exception) {
logger.warn("Unable to disconnect websockets related to session", ex)
}
call.request.cookies[JwtCookie.cookieName]?.let { jwtToken ->
// Notify event handlers to allow disconnecting all web-sockets from this session
GlobalEventDispatcher.notify(SessionCloseEvent(currentSession.userMail, jwtToken))
}

// Reset cookie using http header
call.response.cookies.appendExpired(name = JwtCookie.cookieName, path = "/", domain = "")
call.respond(Unit)
}
// Check authorization cookie is set and refresh JWT token when logged in

// Check authorization cookie is valid and refresh JWT token when logged in
getWithSession(apiPathAuthRestore) {
val payload = call.request.readJwtCookie()
val user = repo.restore(payload.email)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,4 @@ fun Routing.routeReferences() = route("/" + RestPaths.base) {
stream.copyTo(this)
}
}
postWithSession(apiPathImport) { // TODO: update or remove
val multipart = call.receiveMultipart()
multipart.forEachPart { part ->
// Get all file parts of this multipart
if (part is PartData.FileItem) {
val fileName = part.originalFileName ?: "unknown"
val fileContent = part.streamProvider()
}
// Close part to prevent memory leaks
part.dispose()
}
call.respondText("Imported")
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package de.hsaalen.cmt.rsocket

import de.hsaalen.cmt.events.GlobalEventDispatcher
import de.hsaalen.cmt.events.server.UserDocumentChangeEvent
import de.hsaalen.cmt.extensions.launch
import de.hsaalen.cmt.network.dto.objects.LabelChangeMode
import de.hsaalen.cmt.network.dto.websocket.DocumentChangeDto
import de.hsaalen.cmt.network.dto.websocket.LabelUpdateDto
import de.hsaalen.cmt.network.dto.websocket.LiveDto
import de.hsaalen.cmt.network.dto.objects.LineChangeMode
import de.hsaalen.cmt.network.dto.rsocket.DocumentChangeDto
import de.hsaalen.cmt.network.dto.rsocket.LabelUpdateDto
import de.hsaalen.cmt.network.dto.rsocket.LiveDto
import de.hsaalen.cmt.network.dto.rsocket.RequestDocumentDto
import de.hsaalen.cmt.repository.DocumentRepository
import de.hsaalen.cmt.repository.LabelRepository
import de.hsaalen.cmt.session.jwt.JwtPayload
import de.hsaalen.cmt.session.withWebSocketSession
import de.hsaalen.cmt.utils.SerializeHelper
import de.hsaalen.cmt.utils.buildPayload
import de.hsaalen.cmt.utils.decodeProtobufData
import io.ktor.routing.*
import io.rsocket.kotlin.RSocket
import io.rsocket.kotlin.RSocketRequestHandler
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.withTimeout
import mu.KLogger
import mu.KotlinLogging
Expand All @@ -39,7 +46,7 @@ class Connection(socket: RSocket, private val payload: JwtPayload, val jwtToken:
/**
* A unique id for this socket instance.
*/
val socketId: String
private val socketId: String

init {
// Calculate unique id of this connection
Expand Down Expand Up @@ -73,6 +80,48 @@ class Connection(socket: RSocket, private val payload: JwtPayload, val jwtToken:
}
}
}

requestChannel { init, input ->
logger.info("got requestChannel")
withWebSocketSession(userEmail, socketId) {
val request: RequestDocumentDto = init.decodeProtobufData()
val documentUUID = request.reference
val docRepo: DocumentRepository by route.inject()
val events = GlobalEventDispatcher.createBundle(this)

events.launch {
withWebSocketSession(userEmail, socketId) {
input.collect {
try {
val dto: DocumentChangeDto = it.decodeProtobufData()
docRepo.modifyDocument(dto)
} catch (ex: Exception) {
logger.error("Unable to handle document change", ex)
}
}
}
}

val documentFlow = docRepo.downloadContent(documentUUID)
.lineSequence()
.mapIndexed { index, line ->
val mode = if (index == 0) LineChangeMode.MODIFY else LineChangeMode.ADD
DocumentChangeDto(documentUUID, index, line, mode)
}.asFlow()
val eventFlow = events.receiveEventsAsFlow<UserDocumentChangeEvent>()
.filter { it.senderSocketId != socketId }
.map { it.modification }
.filter { it.uuid == documentUUID }

channelFlow {
documentFlow.collect { send(it) }
eventFlow.collect { send(it) }
}.onCompletion {
logger.info("Cancel document editing")
events.unregisterAll()
}.map { it.buildPayload() }
}
}
job.invokeOnCompletion {
WebSocketManager.connections -= this@Connection
logger.info("Websocket disconnected")
Expand Down
Loading

0 comments on commit cda70ec

Please sign in to comment.