Skip to content

Commit

Permalink
Implement body logging for stream responses
Browse files Browse the repository at this point in the history
Fixes problems introduced in Ktor 1.4.2 where JacksonConverter started to return OutputStreamContent

+ Add tests with Jackson and Gson
  • Loading branch information
Koriit committed Jan 15, 2022
1 parent 00856f1 commit 6c00307
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 25 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ dependencies {

// TESTING
testImplementation "io.ktor:ktor-server-test-host:$ktorVersion"
testImplementation "io.ktor:ktor-jackson:$ktorVersion"
testImplementation "io.ktor:ktor-gson:$ktorVersion"
testImplementation "io.mockk:mockk:1.12.2"
testImplementation "com.koriit.kotlin:slf4j-utils-logback:0.4.0"
testImplementation "org.junit.jupiter:junit-jupiter:5.8.2"
Expand Down
86 changes: 65 additions & 21 deletions src/main/kotlin/com/korrit/kotlin/ktor/features/logging/Logging.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import io.ktor.features.CallId
import io.ktor.features.DoubleReceive
import io.ktor.features.callId
import io.ktor.features.origin
import io.ktor.http.content.OutgoingContent
import io.ktor.http.charset
import io.ktor.request.RequestAlreadyConsumedException
import io.ktor.request.httpMethod
import io.ktor.request.httpVersion
Expand All @@ -22,8 +22,14 @@ import io.ktor.request.receive
import io.ktor.routing.Route
import io.ktor.routing.Routing.Feature.RoutingCallStarted
import io.ktor.util.AttributeKey
import io.ktor.util.pipeline.PipelineContext
import io.ktor.util.pipeline.PipelinePhase
import io.ktor.utils.io.core.readText
import io.ktor.utils.io.readRemaining
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.slf4j.Logger
import java.lang.Exception

/**
* Logging feature. Allows logging performance, requests and responses.
Expand Down Expand Up @@ -127,11 +133,13 @@ open class Logging(config: Configuration) {

if (logBody) {
try {
// new line before body as in HTTP request
// empty line before body as in HTTP request
appendLine()
// have to receive ByteArray for DoubleReceive to work
append(String(call.receive<ByteArray>()))
// new line after body because in the log there might be additional info after "log message"
appendLine(String(call.receive<ByteArray>()))
// and we don't want it to be mixed with logged body
appendLine()
} catch (e: RequestAlreadyConsumedException) {
log.error("Logging payloads requires DoubleReceive feature to be installed with receiveEntireContent=true", e)
}
Expand All @@ -140,28 +148,64 @@ open class Logging(config: Configuration) {
)
}

protected open fun logResponse(call: ApplicationCall, subject: Any) {
protected open suspend fun logResponse(pipeline: PipelineContext<Any, ApplicationCall>): Any {
if (logBody) {
return logResponseWithBody(pipeline)
}

// Since we are not logging response body we can log immediately and continue pipeline normally
log.info(
StringBuilder().apply {
appendLine("Sent response:")
appendLine("${call.request.httpVersion} ${call.response.status()}")
if (logHeaders) {
call.response.headers.allValues().forEach { header, values ->
appendLine("$header: ${values.firstOrNull()}")
}
}
if (logBody && subject is OutgoingContent.ByteArrayContent) {
// new line before body as in HTTP response
appendLine()
// new line after body because in the log there might be additional info after "log message"
appendLine(String(subject.bytes()))
}
// do not log warning if subject is not OutgoingContent.ByteArrayContent
// as we could possibly spam warnings without any option to disable them
appendResponse(pipeline.call)
}.toString()
)

return pipeline.subject
}

/**
* To log response payload we need to duplicate response stream
* which is why this function returns a new pipeline subject to proceed with.
*/
protected open suspend fun logResponseWithBody(pipeline: PipelineContext<Any, ApplicationCall>): Any {
@Suppress("TooGenericExceptionCaught") // intended
try {
// logging a response body is harder than logging a request body because
// there is no public api for observing response body stream or something like DoubleReceive for requests
val (observer, observed) = pipeline.observe()
val charset = observed.contentType?.charset() ?: Charsets.UTF_8
// launch a coroutine that will eventually log the response once it is fully written
pipeline.launch(Dispatchers.Unconfined) {
val responseBody = observer.readRemaining().readText(charset = charset)

log.info(
StringBuilder().apply {
appendResponse(pipeline.call)
// empty line before body as in HTTP response
appendLine()
append(responseBody)
// new line after body because in the log there might be additional info after "log message"
// and we don't want it to be mixed with logged body
appendLine()
}.toString()
)
}
return observed
} catch (e: Exception) {
log.warn(e.message, e)
return pipeline.subject
}
}

protected open fun StringBuilder.appendResponse(call: ApplicationCall) {
appendLine("Sent response:")
appendLine("${call.request.httpVersion} ${call.response.status()}")
if (logHeaders) {
call.response.headers.allValues().forEach { header, values ->
appendLine("$header: ${values.firstOrNull()}")
}
}
}
protected open fun shouldLog(call: ApplicationCall): Boolean {
return filters.isEmpty() || filters.any { it(call) }
}
Expand Down Expand Up @@ -200,7 +244,7 @@ open class Logging(config: Configuration) {

if (logRequests || logResponses) {
if (logBody && pipeline.featureOrNull(DoubleReceive) == null) {
throw IllegalStateException("Logging payloads requires DoubleReceive feature to be installed")
throw IllegalStateException("Logging request payloads requires DoubleReceive feature to be installed")
}
if (!logBody && !logHeaders && !logFullUrl) {
log.warn("You have enabled logging of requests/responses but body, full url and headers logging is disabled and there is no information gain")
Expand All @@ -218,7 +262,7 @@ open class Logging(config: Configuration) {
if (logResponses) {
pipeline.sendPipeline.intercept(responseLoggingPhase) {
if (shouldLog(call)) {
logResponse(call, subject)
proceedWith(logResponse(this))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.korrit.kotlin.ktor.features.logging

import io.ktor.application.ApplicationCall
import io.ktor.http.content.OutgoingContent
import io.ktor.util.pipeline.PipelineContext
import io.ktor.util.split
import io.ktor.utils.io.ByteChannel
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.writeFully
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch

internal class ObservedContent(private val channel: ByteReadChannel) : OutgoingContent.ReadChannelContent() {
override fun readFrom(): ByteReadChannel = channel
}

internal suspend fun PipelineContext<Any, ApplicationCall>.observe(): Pair<ByteReadChannel, OutgoingContent> {
return when (val body = subject) {
is OutgoingContent.ByteArrayContent -> {
val observer = ByteChannel().apply {
writeFully(body.bytes())
close(null)
}

observer to body
}
is OutgoingContent.ReadChannelContent -> {
val (observer, observed) = body.readFrom().split(this)

observer to ObservedContent(observed)
}
is OutgoingContent.WriteChannelContent -> {
val (observer, observed) = body.toReadChannel(this).split(this)

observer to ObservedContent(observed)
}
else -> {
val emptyObserver = ByteChannel().apply {
close(null)
}

emptyObserver to body as OutgoingContent
}
}
}

private fun OutgoingContent.WriteChannelContent.toReadChannel(scope: CoroutineScope): ByteReadChannel {
val channel = ByteChannel()
scope.launch(Dispatchers.Unconfined) {
writeTo(channel)
channel.close(null)
}
return channel
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.korrit.kotlin.ktor.features.logging

import ch.qos.logback.classic.Level
import com.koriit.kotlin.slf4j.logger
import com.koriit.kotlin.slf4j.mdc.correlation.correlateThread
import io.ktor.application.call
Expand Down Expand Up @@ -29,12 +30,14 @@ import java.util.UUID

internal class LoggingTest {

private val testLogger = spyk(logger {})

init {
correlateThread()
testLogger as ch.qos.logback.classic.Logger
testLogger.level = Level.DEBUG
}

private val testLogger = spyk(logger {})

@Test
fun `Should log request, response and performance`() {
val server = testServer {
Expand Down Expand Up @@ -212,7 +215,6 @@ internal class LoggingTest {
val response = payloads[1]
assertTrue(response.contains("200 OK"))
assertTrue(request.contains("/api?queryParam=true"))
assertTrue(response.contains("Content-Length"))
assertTrue(response.contains("SOME_BODY"))

server.stop(0, 0)
Expand Down Expand Up @@ -256,7 +258,6 @@ internal class LoggingTest {
assertTrue(response.contains("200 OK"))
assertTrue(request.contains("/api"))
assertFalse(request.contains("queryParam=true"))
assertFalse(response.contains("Content-Length"))
assertFalse(response.contains("SOME_BODY"))

server.stop(0, 0)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package com.korrit.kotlin.ktor.features.logging

import ch.qos.logback.classic.Level.DEBUG
import com.google.gson.Gson
import com.koriit.kotlin.slf4j.logger
import com.koriit.kotlin.slf4j.mdc.correlation.correlateThread
import io.ktor.application.call
import io.ktor.application.install
import io.ktor.features.CallId
import io.ktor.features.ContentNegotiation
import io.ktor.features.DoubleReceive
import io.ktor.gson.gson
import io.ktor.http.HttpHeaders
import io.ktor.http.HttpMethod.Companion.Post
import io.ktor.request.receive
import io.ktor.response.respond
import io.ktor.routing.post
import io.ktor.routing.routing
import io.ktor.server.engine.applicationEngineEnvironment
import io.ktor.server.testing.TestApplicationEngine
import io.ktor.server.testing.handleRequest
import io.ktor.server.testing.setBody
import io.mockk.spyk
import io.mockk.verify
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import java.util.UUID

internal class LoggingWithGsonTest {

private val testLogger = spyk(logger {})
private val gson = Gson()

private data class SampleRequest(val field: String, val innerObject: SampleRequest? = null)

init {
correlateThread()
testLogger as ch.qos.logback.classic.Logger
testLogger.level = DEBUG
}

@Test
fun `Should log body, full url and headers with Gson`() {
val server = testServer {
logRequests = true
logResponses = true
logFullUrl = true
logBody = true
logHeaders = true
}

server.start()

val reqeust = SampleRequest(field = "value", innerObject = SampleRequest("value2"))

server.handleRequest(Post, "/api?queryParam=true") {
addHeader("My-Header", "My-Value")
addHeader("Content-Type", "application/json")
setBody(gson.toJson(reqeust))
}

val payloads = mutableListOf<String>()

verify(exactly = 2) {
testLogger.info(
withArg {
payloads.add(it)
}
)
}
verify(exactly = 1) {
testLogger.info(any(), *anyVararg())
}

val request = payloads[0]
assertTrue(request.contains("POST"))
assertTrue(request.contains("/api?queryParam=true"))
assertTrue(request.contains("My-Header"))
assertTrue(request.contains("My-Value"))
assertTrue(request.contains("""{"field":"value","innerObject":{"field":"value2"}}"""))

val response = payloads[1]
assertTrue(response.contains("200 OK"))
assertTrue(request.contains("/api?queryParam=true"))
assertTrue(response.contains("""{"field":"value","innerObject":{"field":"value2"}}"""))

server.stop(0, 0)
}

private fun testServer(
installCallId: Boolean = true,
installDoubleReceive: Boolean = true,
rootPath: String = "",
configureLogging: Logging.Configuration.() -> Unit = {}
): TestApplicationEngine {
return TestApplicationEngine(
applicationEngineEnvironment {
this.rootPath = rootPath
module {
if (installCallId) {
install(CallId) {
header(HttpHeaders.XRequestId)
generate { UUID.randomUUID().toString() }
verify { it.isNotBlank() }
}
}
if (installDoubleReceive) {
install(DoubleReceive) {
receiveEntireContent = true
}
}

install(Logging) {
logger = testLogger
configureLogging()
}

install(ContentNegotiation) {
gson {}
}

routing {
post("/api") {
val body: SampleRequest = call.receive()
call.respond(body)
}
}
}
}
)
}
}
Loading

0 comments on commit 6c00307

Please sign in to comment.