From 137de5ea1f4a8ca15a1a5145154218a14b440d45 Mon Sep 17 00:00:00 2001 From: John Fallows Date: Tue, 2 Jul 2024 22:11:01 -0700 Subject: [PATCH] Enhance Kafka event descriptions (#1127) --- .../kafka/internal/events/KafkaApiKey.java | 130 ++++++++++++++++++ .../{ => events}/KafkaEventContext.java | 3 +- .../{ => events}/KafkaEventFormatter.java | 8 +- .../KafkaEventFormatterFactory.java | 3 +- .../stream/KafkaClientProduceFactory.java | 2 +- .../stream/KafkaClientSaslHandshaker.java | 2 +- .../src/main/moditect/module-info.java | 2 +- ...time.engine.event.EventFormatterFactorySpi | 2 +- .../client.event.api.version.rejected.yaml | 2 +- ...nt.event.cluster.authorization.failed.yaml | 2 +- 10 files changed, 145 insertions(+), 11 deletions(-) create mode 100644 runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaApiKey.java rename runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/{ => events}/KafkaEventContext.java (97%) rename runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/{ => events}/KafkaEventFormatter.java (88%) rename runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/{ => events}/KafkaEventFormatterFactory.java (88%) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaApiKey.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaApiKey.java new file mode 100644 index 0000000000..f05ddb0e3a --- /dev/null +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaApiKey.java @@ -0,0 +1,130 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.kafka.internal.events; + +import java.util.regex.Pattern; + +public enum KafkaApiKey +{ + PRODUCE(0), + FETCH(1), + LIST_OFFSETS(2), + METADATA(3), + LEADER_AND_ISR(4), + STOP_REPLICA(5), + UPDATE_METADATA(6), + CONTROLLED_SHUTDOWN(7), + OFFSET_COMMIT(8), + OFFSET_FETCH(9), + FIND_COORDINATOR(10), + JOIN_GROUP(11), + HEARTBEAT(12), + LEAVE_GROUP(13), + SYNC_GROUP(14), + DESCRIBE_GROUPS(15), + LIST_GROUPS(16), + SASL_HANDSHAKE(17), + API_VERSIONS(18), + CREATE_TOPICS(19), + DELETE_TOPICS(20), + DELETE_RECORDS(21), + INIT_PRODUCER_ID(22), + OFFSET_FOR_LEADER_EPOCH(23), + ADD_PARTITIONS_TO_TXN(24), + ADD_OFFSETS_TO_TXN(25), + END_TXN(26), + WRITE_TXN_MARKERS(27), + TXN_OFFSET_COMMIT(28), + DESCRIBE_ACLS(29), + CREATE_ACLS(30), + DELETE_ACLS(31), + DESCRIBE_CONFIGS(32), + ALTER_CONFIGS(33), + ALTER_REPLICA_LOG_DIRS(34), + DESCRIBE_LOG_DIRS(35), + SASL_AUTHENTICATE(36), + CREATE_PARTITIONS(37), + CREATE_DELEGATION_TOKEN(38), + RENEW_DELEGATION_TOKEN(39), + EXPIRE_DELEGATION_TOKEN(40), + DESCRIBE_DELEGATION_TOKEN(41), + DELETE_GROUPS(42), + ELECT_LEADERS(43), + INCREMENTAL_ALTER_CONFIGS(44), + ALTER_PARTITION_REASSIGNMENTS(45), + LIST_PARTITION_REASSIGNMENTS(46), + OFFSET_DELETE(47), + DESCRIBE_CLIENT_QUOTAS(48), + ALTER_CLIENT_QUOTAS(49), + DESCRIBE_USER_SCRAM_CREDENTIALS(50), + ALTER_USER_SCRAM_CREDENTIALS(51), + DESCRIBE_QUORUM(55), + ALTER_PARTITION(56), + UPDATE_FEATURES(57), + ENVELOPE(58), + DESCRIBE_CLUSTER(60), + DESCRIBE_PRODUCERS(61), + UNREGISTER_BROKER(64), + DESCRIBE_TRANSACTIONS(65), + LIST_TRANSACTIONS(66), + ALLOCATE_PRODUCER_IDS(67), + CONSUMER_GROUP_HEARTBEAT(68), + CONSUMER_GROUP_DESCRIBE(69), + GET_TELEMETRY_SUBSCRIPTIONS(71), + PUSH_TELEMETRY(72), + LIST_CLIENT_METRICS_RESOURCES(74); + + private final int key; + private final String title; + + KafkaApiKey( + int key) + { + this.key = key; + this.title = toTitleCase(name()); + } + + public String title() + { + return title; + } + + public static KafkaApiKey of( + int key) + { + KafkaApiKey value = null; + + final KafkaApiKey[] values = KafkaApiKey.values(); + for (int i = 0; i < values.length; i++) + { + if (values[i].key == key) + { + value = values[i]; + break; + } + } + + return value; + } + + private static String toTitleCase( + String name) + { + return Pattern.compile("(?:_|^)([a-z])") + .matcher(name.toLowerCase()) + .replaceAll(m -> m.group(1).toUpperCase()); + } +} diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaEventContext.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java similarity index 97% rename from runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaEventContext.java rename to runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java index 6231b0c9c0..9ccab9d1a5 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaEventContext.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.aklivity.zilla.runtime.binding.kafka.internal; +package io.aklivity.zilla.runtime.binding.kafka.internal.events; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.API_VERSION_REJECTED; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.AUTHORIZATION_FAILED; @@ -25,6 +25,7 @@ import org.agrona.concurrent.AtomicBuffer; import org.agrona.concurrent.UnsafeBuffer; +import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding; import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.EventFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventExFW; import io.aklivity.zilla.runtime.engine.EngineContext; diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaEventFormatter.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java similarity index 88% rename from runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaEventFormatter.java rename to runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java index 9747f8c595..7eb8fa589c 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaEventFormatter.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.aklivity.zilla.runtime.binding.kafka.internal; +package io.aklivity.zilla.runtime.binding.kafka.internal.events; import org.agrona.DirectBuffer; @@ -56,13 +56,15 @@ public String format( case API_VERSION_REJECTED: { final KafkaApiVersionRejectedExFW ex = extension.apiVersionRejected(); - result = String.format("%d %d", ex.apiKey(), ex.apiVersion()); + KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey()); + result = String.format("%s (Version: %d)", apiKey.title(), ex.apiVersion()); break; } case CLUSTER_AUTHORIZATION_FAILED: { final KafkaClusterAuthorizationFailedExFW ex = extension.clusterAuthorizationFailed(); - result = String.format("%d %d", ex.apiKey(), ex.apiVersion()); + KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey()); + result = String.format("%s (Version: %d)", apiKey.title(), ex.apiVersion()); break; } } diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaEventFormatterFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatterFactory.java similarity index 88% rename from runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaEventFormatterFactory.java rename to runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatterFactory.java index f522ba7851..38d1cefb67 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaEventFormatterFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatterFactory.java @@ -13,8 +13,9 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.aklivity.zilla.runtime.binding.kafka.internal; +package io.aklivity.zilla.runtime.binding.kafka.internal.events; +import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding; import io.aklivity.zilla.runtime.engine.Configuration; import io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi; diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java index 8f4b71cad3..77af1b77a1 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java @@ -41,9 +41,9 @@ import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding; import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration; -import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaEventContext; import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig; +import io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventContext; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaAckMode; diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java index 20e3b73c1f..91413c555d 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java @@ -43,8 +43,8 @@ import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig; import io.aklivity.zilla.runtime.binding.kafka.identity.KafkaClientIdSupplier; import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration; -import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaEventContext; import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaScramMechanism; +import io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventContext; import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.RequestHeaderFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.ResponseHeaderFW; diff --git a/runtime/binding-kafka/src/main/moditect/module-info.java b/runtime/binding-kafka/src/main/moditect/module-info.java index 1191e87119..0abc209ebd 100644 --- a/runtime/binding-kafka/src/main/moditect/module-info.java +++ b/runtime/binding-kafka/src/main/moditect/module-info.java @@ -36,5 +36,5 @@ with io.aklivity.zilla.runtime.binding.kafka.internal.identity.KafkaConfluentClientIdSupplierFactory; provides io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi - with io.aklivity.zilla.runtime.binding.kafka.internal.KafkaEventFormatterFactory; + with io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventFormatterFactory; } diff --git a/runtime/binding-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi b/runtime/binding-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi index 6659873202..ad42f922c6 100644 --- a/runtime/binding-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi +++ b/runtime/binding-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi @@ -1 +1 @@ -io.aklivity.zilla.runtime.binding.kafka.internal.KafkaEventFormatterFactory +io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventFormatterFactory diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.api.version.rejected.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.api.version.rejected.yaml index 3610b32b36..43ffc6c452 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.api.version.rejected.yaml +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.api.version.rejected.yaml @@ -25,7 +25,7 @@ telemetry: - qname: test.app0 id: binding.kafka.api.version.rejected name: BINDING_KAFKA_API_VERSION_REJECTED - message: 32 0 + message: "DescribeConfigs (Version: 0)" bindings: app0: type: kafka diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.cluster.authorization.failed.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.cluster.authorization.failed.yaml index dcdc964079..ccd7c5f213 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.cluster.authorization.failed.yaml +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.cluster.authorization.failed.yaml @@ -25,7 +25,7 @@ telemetry: - qname: test.app0 id: binding.kafka.cluster.authorization.failed name: BINDING_KAFKA_CLUSTER_AUTHORIZATION_FAILED - message: 32 0 + message: "DescribeConfigs (Version: 0)" bindings: app0: type: kafka