Skip to content

Commit

Permalink
KAFKA-17563 Move RequestConvertToJson to server module (apache#17223)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <chia7712@apache.org>
  • Loading branch information
xijiu authored Sep 26, 2024
1 parent f8acfa5 commit 18340c9
Show file tree
Hide file tree
Showing 14 changed files with 968 additions and 368 deletions.
4 changes: 4 additions & 0 deletions checkstyle/import-control-server.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,8 @@
<allow pkg="org.apache.kafka.server.authorizer" />
</subpackage>

<subpackage name="network">
<allow pkg="com.fasterxml.jackson" />
</subpackage>

</import-control>
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
<!-- server tests -->
<suppress checks="MethodLength|JavaNCSS|NPath" files="DescribeTopicPartitionsRequestHandlerTest.java"/>
<suppress checks="CyclomaticComplexity" files="ListConsumerGroupTest.java"/>
<suppress checks="ClassFanOutComplexity|CyclomaticComplexity|MethodLength|ParameterNumber|JavaNCSS|ImportControl" files="RequestConvertToJson.java"/>

<!-- Clients -->
<suppress id="dontUseSystemExit"
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ import org.apache.kafka.network.Session
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.network.RequestConvertToJson

import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._
import scala.reflect.ClassTag

object RequestChannel extends Logging {
Expand Down Expand Up @@ -249,7 +251,7 @@ object RequestChannel extends Logging {
recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos))

if (isRequestLoggingEnabled) {
val desc = RequestConvertToJson.requestDescMetrics(header, requestLog, response.responseLog,
val desc = RequestConvertToJson.requestDescMetrics(header, requestLog.asJava, response.responseLog.asJava,
context, session, isForwarded,
totalTimeMs, requestQueueTimeMs, apiLocalTimeMs,
apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs,
Expand Down
267 changes: 0 additions & 267 deletions core/src/main/scala/kafka/network/RequestConvertToJson.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.AlterConfigsRequest._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.test
import org.junit.jupiter.api.Assertions._
Expand All @@ -49,6 +50,7 @@ import java.nio.ByteBuffer
import java.util.Collections
import java.util.concurrent.atomic.AtomicReference
import scala.collection.{Map, Seq}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._

class RequestChannelTest {
Expand All @@ -69,7 +71,7 @@ class RequestChannelTest {
val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
val loggedConfig = loggableAlterConfigs.configs.get(resource)
assertEquals(expectedValues, toMap(loggedConfig))
val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog, alterConfigs.isForwarded).toString
val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc")
}

Expand Down Expand Up @@ -133,7 +135,7 @@ class RequestChannelTest {
val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs
assertEquals(expectedValues, toMap(loggedConfig))
val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog, alterConfigs.isForwarded).toString
val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,91 +21,22 @@ import java.net.InetAddress
import java.nio.ByteBuffer
import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, JsonNodeFactory, LongNode, ObjectNode, TextNode}
import kafka.network
import kafka.network.RequestConvertToJson.requestHeaderNode
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message._
import org.apache.kafka.common.network.{ClientInformation, ListenerName, NetworkSend}
import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.mockito.Mockito.mock

import java.util.Collections
import scala.collection.mutable.ArrayBuffer
import scala.compat.java8.OptionConverters._

class RequestConvertToJsonTest {

@Test
def testAllRequestTypesHandled(): Unit = {
val unhandledKeys = ArrayBuffer[String]()
ApiKeys.values().foreach { key => {
val version: Short = key.latestVersion()
val message = key match {
case ApiKeys.DESCRIBE_ACLS =>
ApiMessageType.fromApiKey(key.id).newRequest().asInstanceOf[DescribeAclsRequestData]
.setPatternTypeFilter(1).setResourceTypeFilter(1).setPermissionType(1).setOperation(1)
case _ =>
ApiMessageType.fromApiKey(key.id).newRequest()
}

val bytes = MessageUtil.toByteBuffer(message, version)
val req = AbstractRequest.parseRequest(key, version, bytes).request
try {
RequestConvertToJson.request(req)
} catch {
case _ : IllegalStateException => unhandledKeys += key.toString
}
}}
assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled request keys")
}

@Test
def testAllApiVersionsResponseHandled(): Unit = {

ApiKeys.values().foreach { key => {
val unhandledVersions = ArrayBuffer[java.lang.Short]()
key.allVersions().forEach { version => {
val message = key match {
// Specify top-level error handling for verifying compatibility across versions
case ApiKeys.DESCRIBE_LOG_DIRS =>
ApiMessageType.fromApiKey(key.id).newResponse().asInstanceOf[DescribeLogDirsResponseData]
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())
case _ =>
ApiMessageType.fromApiKey(key.id).newResponse()
}

val bytes = MessageUtil.toByteBuffer(message, version)
val response = AbstractResponse.parseResponse(key, bytes, version)
try {
RequestConvertToJson.response(response, version)
} catch {
case _ : IllegalStateException => unhandledVersions += version
}}
}
assertEquals(ArrayBuffer.empty, unhandledVersions, s"API: ${key.toString} - Unhandled request versions")
}}
}

@Test
def testAllResponseTypesHandled(): Unit = {
val unhandledKeys = ArrayBuffer[String]()
ApiKeys.values().foreach { key => {
val version: Short = key.latestVersion()
val message = ApiMessageType.fromApiKey(key.id).newResponse()
val bytes = MessageUtil.toByteBuffer(message, version)
val res = AbstractResponse.parseResponse(key, bytes, version)
try {
RequestConvertToJson.response(res, version)
} catch {
case _ : IllegalStateException => unhandledKeys += key.toString
}
}}
assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled response keys")
}

@Test
def testRequestHeaderNode(): Unit = {
val alterIsrRequest = new AlterPartitionRequest(new AlterPartitionRequestData(), 0)
Expand Down Expand Up @@ -135,30 +66,17 @@ class RequestConvertToJsonTest {
assertEquals(expectedNode, actualNode)
}

@Test
def testClientInfoNode(): Unit = {
val clientInfo = new ClientInformation("name", "1")

val expectedNode = new ObjectNode(JsonNodeFactory.instance)
expectedNode.set("softwareName", new TextNode(clientInfo.softwareName))
expectedNode.set("softwareVersion", new TextNode(clientInfo.softwareVersion))

val actualNode = RequestConvertToJson.clientInfoNode(clientInfo)

assertEquals(expectedNode, actualNode)
}

@Test
def testRequestDesc(): Unit = {
val alterIsrRequest = new AlterPartitionRequest(new AlterPartitionRequestData(), 0)
val req = request(alterIsrRequest)

val expectedNode = new ObjectNode(JsonNodeFactory.instance)
expectedNode.set("isForwarded", if (req.isForwarded) BooleanNode.TRUE else BooleanNode.FALSE)
expectedNode.set("requestHeader", requestHeaderNode(req.header))
expectedNode.set("requestHeader", RequestConvertToJson.requestHeaderNode(req.header))
expectedNode.set("request", req.requestLog.getOrElse(new TextNode("")))

val actualNode = RequestConvertToJson.requestDesc(req.header, req.requestLog, req.isForwarded)
val actualNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.asJava, req.isForwarded)

assertEquals(expectedNode, actualNode)
}
Expand All @@ -181,7 +99,7 @@ class RequestConvertToJsonTest {
val temporaryMemoryBytes = 8
val messageConversionsTimeMs = 9

val expectedNode = RequestConvertToJson.requestDesc(req.header, req.requestLog, req.isForwarded).asInstanceOf[ObjectNode]
val expectedNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.asJava, req.isForwarded).asInstanceOf[ObjectNode]
expectedNode.set("response", res.responseLog.getOrElse(new TextNode("")))
expectedNode.set("connection", new TextNode(req.context.connectionId))
expectedNode.set("totalTimeMs", new DoubleNode(totalTimeMs))
Expand All @@ -198,7 +116,7 @@ class RequestConvertToJsonTest {
expectedNode.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes))
expectedNode.set("messageConversionsTime", new DoubleNode(messageConversionsTimeMs))

val actualNode = RequestConvertToJson.requestDescMetrics(req.header, req.requestLog, res.responseLog, req.context, req.session, req.isForwarded,
val actualNode = RequestConvertToJson.requestDescMetrics(req.header, req.requestLog.asJava, res.responseLog.asJava, req.context, req.session, req.isForwarded,
totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs,
responseSendTimeMs, temporaryMemoryBytes, messageConversionsTimeMs).asInstanceOf[ObjectNode]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils._
import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestMetrics
import org.apache.kafka.security.CredentialProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.kafka.jmh.common;

import kafka.network.RequestConvertToJson;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.Send;
Expand All @@ -27,6 +25,7 @@
import org.apache.kafka.common.requests.ByteBufferChannel;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.network.RequestConvertToJson;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.kafka.jmh.common;

import kafka.network.RequestConvertToJson;

import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.network.RequestConvertToJson;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.kafka.jmh.common;

import kafka.network.RequestConvertToJson;

import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.network.RequestConvertToJson;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.network.RequestConvertToJson;
import kafka.server.AutoTopicCreationManager;
import kafka.server.ClientQuotaManager;
import kafka.server.ClientRequestQuotaManager;
Expand Down Expand Up @@ -58,6 +57,7 @@
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.FinalizedFeatures;
Expand Down Expand Up @@ -237,7 +237,9 @@ public void testMetadataRequestForAllTopics() {

@Benchmark
public String testRequestToJson() {
return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), allTopicMetadataRequest.requestLog(), allTopicMetadataRequest.isForwarded()).toString();
Option<com.fasterxml.jackson.databind.JsonNode> option = allTopicMetadataRequest.requestLog();
Optional<com.fasterxml.jackson.databind.JsonNode> optional = option.isDefined() ? Optional.of(option.get()) : Optional.empty();
return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), optional, allTopicMetadataRequest.isForwarded()).toString();
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import kafka.controller.KafkaController;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.network.RequestConvertToJson;
import kafka.server.AutoTopicCreationManager;
import kafka.server.BrokerFeatures;
import kafka.server.ClientQuotaManager;
Expand Down Expand Up @@ -59,6 +58,7 @@
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.server.common.FinalizedFeatures;
import org.apache.kafka.server.common.MetadataVersion;
Expand Down Expand Up @@ -237,7 +237,9 @@ public void testMetadataRequestForAllTopics() {

@Benchmark
public String testRequestToJson() {
return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), allTopicMetadataRequest.requestLog(), allTopicMetadataRequest.isForwarded()).toString();
Option<com.fasterxml.jackson.databind.JsonNode> option = allTopicMetadataRequest.requestLog();
Optional<com.fasterxml.jackson.databind.JsonNode> optional = option.isDefined() ? Optional.of(option.get()) : Optional.empty();
return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), optional, allTopicMetadataRequest.isForwarded()).toString();
}

@Benchmark
Expand Down
Loading

0 comments on commit 18340c9

Please sign in to comment.