From 6c0030762566b1df5f1f22af654cc9aef114b5ed Mon Sep 17 00:00:00 2001 From: Aleksander Stelmaczonek Date: Sat, 15 Jan 2022 22:57:36 +0100 Subject: [PATCH] Implement body logging for stream responses Fixes problems introduced in Ktor 1.4.2 where JacksonConverter started to return OutputStreamContent + Add tests with Jackson and Gson --- build.gradle | 2 + .../kotlin/ktor/features/logging/Logging.kt | 86 +++++++++--- .../ktor/features/logging/ObservedContent.kt | 55 ++++++++ .../ktor/features/logging/LoggingTest.kt | 9 +- .../features/logging/LoggingWithGsonTest.kt | 132 ++++++++++++++++++ .../logging/LoggingWithJacksonTest.kt | 132 ++++++++++++++++++ 6 files changed, 391 insertions(+), 25 deletions(-) create mode 100644 src/main/kotlin/com/korrit/kotlin/ktor/features/logging/ObservedContent.kt create mode 100644 src/test/kotlin/com/korrit/kotlin/ktor/features/logging/LoggingWithGsonTest.kt create mode 100644 src/test/kotlin/com/korrit/kotlin/ktor/features/logging/LoggingWithJacksonTest.kt diff --git a/build.gradle b/build.gradle index a6a17e4..9cb2945 100644 --- a/build.gradle +++ b/build.gradle @@ -37,6 +37,8 @@ dependencies { // TESTING testImplementation "io.ktor:ktor-server-test-host:$ktorVersion" + testImplementation "io.ktor:ktor-jackson:$ktorVersion" + testImplementation "io.ktor:ktor-gson:$ktorVersion" testImplementation "io.mockk:mockk:1.12.2" testImplementation "com.koriit.kotlin:slf4j-utils-logback:0.4.0" testImplementation "org.junit.jupiter:junit-jupiter:5.8.2" diff --git a/src/main/kotlin/com/korrit/kotlin/ktor/features/logging/Logging.kt b/src/main/kotlin/com/korrit/kotlin/ktor/features/logging/Logging.kt index 7de82ac..87a183a 100644 --- a/src/main/kotlin/com/korrit/kotlin/ktor/features/logging/Logging.kt +++ b/src/main/kotlin/com/korrit/kotlin/ktor/features/logging/Logging.kt @@ -13,7 +13,7 @@ import io.ktor.features.CallId import io.ktor.features.DoubleReceive import io.ktor.features.callId import io.ktor.features.origin -import io.ktor.http.content.OutgoingContent +import io.ktor.http.charset import io.ktor.request.RequestAlreadyConsumedException import io.ktor.request.httpMethod import io.ktor.request.httpVersion @@ -22,8 +22,14 @@ import io.ktor.request.receive import io.ktor.routing.Route import io.ktor.routing.Routing.Feature.RoutingCallStarted import io.ktor.util.AttributeKey +import io.ktor.util.pipeline.PipelineContext import io.ktor.util.pipeline.PipelinePhase +import io.ktor.utils.io.core.readText +import io.ktor.utils.io.readRemaining +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.slf4j.Logger +import java.lang.Exception /** * Logging feature. Allows logging performance, requests and responses. @@ -127,11 +133,13 @@ open class Logging(config: Configuration) { if (logBody) { try { - // new line before body as in HTTP request + // empty line before body as in HTTP request appendLine() // have to receive ByteArray for DoubleReceive to work + append(String(call.receive())) // new line after body because in the log there might be additional info after "log message" - appendLine(String(call.receive())) + // and we don't want it to be mixed with logged body + appendLine() } catch (e: RequestAlreadyConsumedException) { log.error("Logging payloads requires DoubleReceive feature to be installed with receiveEntireContent=true", e) } @@ -140,28 +148,64 @@ open class Logging(config: Configuration) { ) } - protected open fun logResponse(call: ApplicationCall, subject: Any) { + protected open suspend fun logResponse(pipeline: PipelineContext): Any { + if (logBody) { + return logResponseWithBody(pipeline) + } + + // Since we are not logging response body we can log immediately and continue pipeline normally log.info( StringBuilder().apply { - appendLine("Sent response:") - appendLine("${call.request.httpVersion} ${call.response.status()}") - if (logHeaders) { - call.response.headers.allValues().forEach { header, values -> - appendLine("$header: ${values.firstOrNull()}") - } - } - if (logBody && subject is OutgoingContent.ByteArrayContent) { - // new line before body as in HTTP response - appendLine() - // new line after body because in the log there might be additional info after "log message" - appendLine(String(subject.bytes())) - } - // do not log warning if subject is not OutgoingContent.ByteArrayContent - // as we could possibly spam warnings without any option to disable them + appendResponse(pipeline.call) }.toString() ) + + return pipeline.subject } + /** + * To log response payload we need to duplicate response stream + * which is why this function returns a new pipeline subject to proceed with. + */ + protected open suspend fun logResponseWithBody(pipeline: PipelineContext): Any { + @Suppress("TooGenericExceptionCaught") // intended + try { + // logging a response body is harder than logging a request body because + // there is no public api for observing response body stream or something like DoubleReceive for requests + val (observer, observed) = pipeline.observe() + val charset = observed.contentType?.charset() ?: Charsets.UTF_8 + // launch a coroutine that will eventually log the response once it is fully written + pipeline.launch(Dispatchers.Unconfined) { + val responseBody = observer.readRemaining().readText(charset = charset) + + log.info( + StringBuilder().apply { + appendResponse(pipeline.call) + // empty line before body as in HTTP response + appendLine() + append(responseBody) + // new line after body because in the log there might be additional info after "log message" + // and we don't want it to be mixed with logged body + appendLine() + }.toString() + ) + } + return observed + } catch (e: Exception) { + log.warn(e.message, e) + return pipeline.subject + } + } + + protected open fun StringBuilder.appendResponse(call: ApplicationCall) { + appendLine("Sent response:") + appendLine("${call.request.httpVersion} ${call.response.status()}") + if (logHeaders) { + call.response.headers.allValues().forEach { header, values -> + appendLine("$header: ${values.firstOrNull()}") + } + } + } protected open fun shouldLog(call: ApplicationCall): Boolean { return filters.isEmpty() || filters.any { it(call) } } @@ -200,7 +244,7 @@ open class Logging(config: Configuration) { if (logRequests || logResponses) { if (logBody && pipeline.featureOrNull(DoubleReceive) == null) { - throw IllegalStateException("Logging payloads requires DoubleReceive feature to be installed") + throw IllegalStateException("Logging request payloads requires DoubleReceive feature to be installed") } if (!logBody && !logHeaders && !logFullUrl) { log.warn("You have enabled logging of requests/responses but body, full url and headers logging is disabled and there is no information gain") @@ -218,7 +262,7 @@ open class Logging(config: Configuration) { if (logResponses) { pipeline.sendPipeline.intercept(responseLoggingPhase) { if (shouldLog(call)) { - logResponse(call, subject) + proceedWith(logResponse(this)) } } } diff --git a/src/main/kotlin/com/korrit/kotlin/ktor/features/logging/ObservedContent.kt b/src/main/kotlin/com/korrit/kotlin/ktor/features/logging/ObservedContent.kt new file mode 100644 index 0000000..0c78d42 --- /dev/null +++ b/src/main/kotlin/com/korrit/kotlin/ktor/features/logging/ObservedContent.kt @@ -0,0 +1,55 @@ +package com.korrit.kotlin.ktor.features.logging + +import io.ktor.application.ApplicationCall +import io.ktor.http.content.OutgoingContent +import io.ktor.util.pipeline.PipelineContext +import io.ktor.util.split +import io.ktor.utils.io.ByteChannel +import io.ktor.utils.io.ByteReadChannel +import io.ktor.utils.io.writeFully +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch + +internal class ObservedContent(private val channel: ByteReadChannel) : OutgoingContent.ReadChannelContent() { + override fun readFrom(): ByteReadChannel = channel +} + +internal suspend fun PipelineContext.observe(): Pair { + return when (val body = subject) { + is OutgoingContent.ByteArrayContent -> { + val observer = ByteChannel().apply { + writeFully(body.bytes()) + close(null) + } + + observer to body + } + is OutgoingContent.ReadChannelContent -> { + val (observer, observed) = body.readFrom().split(this) + + observer to ObservedContent(observed) + } + is OutgoingContent.WriteChannelContent -> { + val (observer, observed) = body.toReadChannel(this).split(this) + + observer to ObservedContent(observed) + } + else -> { + val emptyObserver = ByteChannel().apply { + close(null) + } + + emptyObserver to body as OutgoingContent + } + } +} + +private fun OutgoingContent.WriteChannelContent.toReadChannel(scope: CoroutineScope): ByteReadChannel { + val channel = ByteChannel() + scope.launch(Dispatchers.Unconfined) { + writeTo(channel) + channel.close(null) + } + return channel +} diff --git a/src/test/kotlin/com/korrit/kotlin/ktor/features/logging/LoggingTest.kt b/src/test/kotlin/com/korrit/kotlin/ktor/features/logging/LoggingTest.kt index d7c7d20..0ee0c90 100644 --- a/src/test/kotlin/com/korrit/kotlin/ktor/features/logging/LoggingTest.kt +++ b/src/test/kotlin/com/korrit/kotlin/ktor/features/logging/LoggingTest.kt @@ -1,5 +1,6 @@ package com.korrit.kotlin.ktor.features.logging +import ch.qos.logback.classic.Level import com.koriit.kotlin.slf4j.logger import com.koriit.kotlin.slf4j.mdc.correlation.correlateThread import io.ktor.application.call @@ -29,12 +30,14 @@ import java.util.UUID internal class LoggingTest { + private val testLogger = spyk(logger {}) + init { correlateThread() + testLogger as ch.qos.logback.classic.Logger + testLogger.level = Level.DEBUG } - private val testLogger = spyk(logger {}) - @Test fun `Should log request, response and performance`() { val server = testServer { @@ -212,7 +215,6 @@ internal class LoggingTest { val response = payloads[1] assertTrue(response.contains("200 OK")) assertTrue(request.contains("/api?queryParam=true")) - assertTrue(response.contains("Content-Length")) assertTrue(response.contains("SOME_BODY")) server.stop(0, 0) @@ -256,7 +258,6 @@ internal class LoggingTest { assertTrue(response.contains("200 OK")) assertTrue(request.contains("/api")) assertFalse(request.contains("queryParam=true")) - assertFalse(response.contains("Content-Length")) assertFalse(response.contains("SOME_BODY")) server.stop(0, 0) diff --git a/src/test/kotlin/com/korrit/kotlin/ktor/features/logging/LoggingWithGsonTest.kt b/src/test/kotlin/com/korrit/kotlin/ktor/features/logging/LoggingWithGsonTest.kt new file mode 100644 index 0000000..bc87afd --- /dev/null +++ b/src/test/kotlin/com/korrit/kotlin/ktor/features/logging/LoggingWithGsonTest.kt @@ -0,0 +1,132 @@ +package com.korrit.kotlin.ktor.features.logging + +import ch.qos.logback.classic.Level.DEBUG +import com.google.gson.Gson +import com.koriit.kotlin.slf4j.logger +import com.koriit.kotlin.slf4j.mdc.correlation.correlateThread +import io.ktor.application.call +import io.ktor.application.install +import io.ktor.features.CallId +import io.ktor.features.ContentNegotiation +import io.ktor.features.DoubleReceive +import io.ktor.gson.gson +import io.ktor.http.HttpHeaders +import io.ktor.http.HttpMethod.Companion.Post +import io.ktor.request.receive +import io.ktor.response.respond +import io.ktor.routing.post +import io.ktor.routing.routing +import io.ktor.server.engine.applicationEngineEnvironment +import io.ktor.server.testing.TestApplicationEngine +import io.ktor.server.testing.handleRequest +import io.ktor.server.testing.setBody +import io.mockk.spyk +import io.mockk.verify +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import java.util.UUID + +internal class LoggingWithGsonTest { + + private val testLogger = spyk(logger {}) + private val gson = Gson() + + private data class SampleRequest(val field: String, val innerObject: SampleRequest? = null) + + init { + correlateThread() + testLogger as ch.qos.logback.classic.Logger + testLogger.level = DEBUG + } + + @Test + fun `Should log body, full url and headers with Gson`() { + val server = testServer { + logRequests = true + logResponses = true + logFullUrl = true + logBody = true + logHeaders = true + } + + server.start() + + val reqeust = SampleRequest(field = "value", innerObject = SampleRequest("value2")) + + server.handleRequest(Post, "/api?queryParam=true") { + addHeader("My-Header", "My-Value") + addHeader("Content-Type", "application/json") + setBody(gson.toJson(reqeust)) + } + + val payloads = mutableListOf() + + verify(exactly = 2) { + testLogger.info( + withArg { + payloads.add(it) + } + ) + } + verify(exactly = 1) { + testLogger.info(any(), *anyVararg()) + } + + val request = payloads[0] + assertTrue(request.contains("POST")) + assertTrue(request.contains("/api?queryParam=true")) + assertTrue(request.contains("My-Header")) + assertTrue(request.contains("My-Value")) + assertTrue(request.contains("""{"field":"value","innerObject":{"field":"value2"}}""")) + + val response = payloads[1] + assertTrue(response.contains("200 OK")) + assertTrue(request.contains("/api?queryParam=true")) + assertTrue(response.contains("""{"field":"value","innerObject":{"field":"value2"}}""")) + + server.stop(0, 0) + } + + private fun testServer( + installCallId: Boolean = true, + installDoubleReceive: Boolean = true, + rootPath: String = "", + configureLogging: Logging.Configuration.() -> Unit = {} + ): TestApplicationEngine { + return TestApplicationEngine( + applicationEngineEnvironment { + this.rootPath = rootPath + module { + if (installCallId) { + install(CallId) { + header(HttpHeaders.XRequestId) + generate { UUID.randomUUID().toString() } + verify { it.isNotBlank() } + } + } + if (installDoubleReceive) { + install(DoubleReceive) { + receiveEntireContent = true + } + } + + install(Logging) { + logger = testLogger + configureLogging() + } + + install(ContentNegotiation) { + gson {} + } + + routing { + post("/api") { + val body: SampleRequest = call.receive() + call.respond(body) + } + } + } + } + ) + } +} diff --git a/src/test/kotlin/com/korrit/kotlin/ktor/features/logging/LoggingWithJacksonTest.kt b/src/test/kotlin/com/korrit/kotlin/ktor/features/logging/LoggingWithJacksonTest.kt new file mode 100644 index 0000000..2567047 --- /dev/null +++ b/src/test/kotlin/com/korrit/kotlin/ktor/features/logging/LoggingWithJacksonTest.kt @@ -0,0 +1,132 @@ +package com.korrit.kotlin.ktor.features.logging + +import ch.qos.logback.classic.Level.DEBUG +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.koriit.kotlin.slf4j.logger +import com.koriit.kotlin.slf4j.mdc.correlation.correlateThread +import io.ktor.application.call +import io.ktor.application.install +import io.ktor.features.CallId +import io.ktor.features.ContentNegotiation +import io.ktor.features.DoubleReceive +import io.ktor.http.HttpHeaders +import io.ktor.http.HttpMethod.Companion.Post +import io.ktor.jackson.jackson +import io.ktor.request.receive +import io.ktor.response.respond +import io.ktor.routing.post +import io.ktor.routing.routing +import io.ktor.server.engine.applicationEngineEnvironment +import io.ktor.server.testing.TestApplicationEngine +import io.ktor.server.testing.handleRequest +import io.ktor.server.testing.setBody +import io.mockk.spyk +import io.mockk.verify +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import java.util.UUID + +internal class LoggingWithJacksonTest { + + private val testLogger = spyk(logger {}) + private val jackson = jacksonObjectMapper() + + private data class SampleRequest(val field: String, val innerObject: SampleRequest? = null) + + init { + correlateThread() + testLogger as ch.qos.logback.classic.Logger + testLogger.level = DEBUG + } + + @Test + fun `Should log body, full url and headers with Jackson`() { + val server = testServer { + logRequests = true + logResponses = true + logFullUrl = true + logBody = true + logHeaders = true + } + + server.start() + + val reqeust = SampleRequest(field = "value", innerObject = SampleRequest("value2")) + + server.handleRequest(Post, "/api?queryParam=true") { + addHeader("My-Header", "My-Value") + addHeader("Content-Type", "application/json") + setBody(jackson.writeValueAsString(reqeust)) + } + + val payloads = mutableListOf() + + verify(exactly = 2) { + testLogger.info( + withArg { + payloads.add(it) + } + ) + } + verify(exactly = 1) { + testLogger.info(any(), *anyVararg()) + } + + val request = payloads[0] + assertTrue(request.contains("POST")) + assertTrue(request.contains("/api?queryParam=true")) + assertTrue(request.contains("My-Header")) + assertTrue(request.contains("My-Value")) + assertTrue(request.contains("""{"field":"value","innerObject":{"field":"value2","innerObject":null}}""")) + + val response = payloads[1] + assertTrue(response.contains("200 OK")) + assertTrue(request.contains("/api?queryParam=true")) + assertTrue(response.contains("""{"field":"value","innerObject":{"field":"value2","innerObject":null}}""")) + + server.stop(0, 0) + } + + private fun testServer( + installCallId: Boolean = true, + installDoubleReceive: Boolean = true, + rootPath: String = "", + configureLogging: Logging.Configuration.() -> Unit = {} + ): TestApplicationEngine { + return TestApplicationEngine( + applicationEngineEnvironment { + this.rootPath = rootPath + module { + if (installCallId) { + install(CallId) { + header(HttpHeaders.XRequestId) + generate { UUID.randomUUID().toString() } + verify { it.isNotBlank() } + } + } + if (installDoubleReceive) { + install(DoubleReceive) { + receiveEntireContent = true + } + } + + install(Logging) { + logger = testLogger + configureLogging() + } + + install(ContentNegotiation) { + jackson {} + } + + routing { + post("/api") { + val body: SampleRequest = call.receive() + call.respond(body) + } + } + } + } + ) + } +}