Skip to content

Commit

Permalink
Enhance Kafka event descriptions (#1127)
Browse files Browse the repository at this point in the history
  • Loading branch information
jfallows authored Jul 3, 2024
1 parent 0fdf684 commit 137de5e
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.internal.events;

import java.util.regex.Pattern;

public enum KafkaApiKey
{
PRODUCE(0),
FETCH(1),
LIST_OFFSETS(2),
METADATA(3),
LEADER_AND_ISR(4),
STOP_REPLICA(5),
UPDATE_METADATA(6),
CONTROLLED_SHUTDOWN(7),
OFFSET_COMMIT(8),
OFFSET_FETCH(9),
FIND_COORDINATOR(10),
JOIN_GROUP(11),
HEARTBEAT(12),
LEAVE_GROUP(13),
SYNC_GROUP(14),
DESCRIBE_GROUPS(15),
LIST_GROUPS(16),
SASL_HANDSHAKE(17),
API_VERSIONS(18),
CREATE_TOPICS(19),
DELETE_TOPICS(20),
DELETE_RECORDS(21),
INIT_PRODUCER_ID(22),
OFFSET_FOR_LEADER_EPOCH(23),
ADD_PARTITIONS_TO_TXN(24),
ADD_OFFSETS_TO_TXN(25),
END_TXN(26),
WRITE_TXN_MARKERS(27),
TXN_OFFSET_COMMIT(28),
DESCRIBE_ACLS(29),
CREATE_ACLS(30),
DELETE_ACLS(31),
DESCRIBE_CONFIGS(32),
ALTER_CONFIGS(33),
ALTER_REPLICA_LOG_DIRS(34),
DESCRIBE_LOG_DIRS(35),
SASL_AUTHENTICATE(36),
CREATE_PARTITIONS(37),
CREATE_DELEGATION_TOKEN(38),
RENEW_DELEGATION_TOKEN(39),
EXPIRE_DELEGATION_TOKEN(40),
DESCRIBE_DELEGATION_TOKEN(41),
DELETE_GROUPS(42),
ELECT_LEADERS(43),
INCREMENTAL_ALTER_CONFIGS(44),
ALTER_PARTITION_REASSIGNMENTS(45),
LIST_PARTITION_REASSIGNMENTS(46),
OFFSET_DELETE(47),
DESCRIBE_CLIENT_QUOTAS(48),
ALTER_CLIENT_QUOTAS(49),
DESCRIBE_USER_SCRAM_CREDENTIALS(50),
ALTER_USER_SCRAM_CREDENTIALS(51),
DESCRIBE_QUORUM(55),
ALTER_PARTITION(56),
UPDATE_FEATURES(57),
ENVELOPE(58),
DESCRIBE_CLUSTER(60),
DESCRIBE_PRODUCERS(61),
UNREGISTER_BROKER(64),
DESCRIBE_TRANSACTIONS(65),
LIST_TRANSACTIONS(66),
ALLOCATE_PRODUCER_IDS(67),
CONSUMER_GROUP_HEARTBEAT(68),
CONSUMER_GROUP_DESCRIBE(69),
GET_TELEMETRY_SUBSCRIPTIONS(71),
PUSH_TELEMETRY(72),
LIST_CLIENT_METRICS_RESOURCES(74);

private final int key;
private final String title;

KafkaApiKey(
int key)
{
this.key = key;
this.title = toTitleCase(name());
}

public String title()
{
return title;
}

public static KafkaApiKey of(
int key)
{
KafkaApiKey value = null;

final KafkaApiKey[] values = KafkaApiKey.values();
for (int i = 0; i < values.length; i++)
{
if (values[i].key == key)
{
value = values[i];
break;
}
}

return value;
}

private static String toTitleCase(
String name)
{
return Pattern.compile("(?:_|^)([a-z])")
.matcher(name.toLowerCase())
.replaceAll(m -> m.group(1).toUpperCase());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.internal;
package io.aklivity.zilla.runtime.binding.kafka.internal.events;

import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.API_VERSION_REJECTED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.AUTHORIZATION_FAILED;
Expand All @@ -25,6 +25,7 @@
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventExFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.internal;
package io.aklivity.zilla.runtime.binding.kafka.internal.events;

import org.agrona.DirectBuffer;

Expand Down Expand Up @@ -56,13 +56,15 @@ public String format(
case API_VERSION_REJECTED:
{
final KafkaApiVersionRejectedExFW ex = extension.apiVersionRejected();
result = String.format("%d %d", ex.apiKey(), ex.apiVersion());
KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey());
result = String.format("%s (Version: %d)", apiKey.title(), ex.apiVersion());
break;
}
case CLUSTER_AUTHORIZATION_FAILED:
{
final KafkaClusterAuthorizationFailedExFW ex = extension.clusterAuthorizationFailed();
result = String.format("%d %d", ex.apiKey(), ex.apiVersion());
KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey());
result = String.format("%s (Version: %d)", apiKey.title(), ex.apiVersion());
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.internal;
package io.aklivity.zilla.runtime.binding.kafka.internal.events;

import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaEventContext;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventContext;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaAckMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
import io.aklivity.zilla.runtime.binding.kafka.identity.KafkaClientIdSupplier;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaEventContext;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaScramMechanism;
import io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventContext;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.RequestHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.ResponseHeaderFW;
Expand Down
2 changes: 1 addition & 1 deletion runtime/binding-kafka/src/main/moditect/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@
with io.aklivity.zilla.runtime.binding.kafka.internal.identity.KafkaConfluentClientIdSupplierFactory;

provides io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi
with io.aklivity.zilla.runtime.binding.kafka.internal.KafkaEventFormatterFactory;
with io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventFormatterFactory;
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
io.aklivity.zilla.runtime.binding.kafka.internal.KafkaEventFormatterFactory
io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventFormatterFactory
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ telemetry:
- qname: test.app0
id: binding.kafka.api.version.rejected
name: BINDING_KAFKA_API_VERSION_REJECTED
message: 32 0
message: "DescribeConfigs (Version: 0)"
bindings:
app0:
type: kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ telemetry:
- qname: test.app0
id: binding.kafka.cluster.authorization.failed
name: BINDING_KAFKA_CLUSTER_AUTHORIZATION_FAILED
message: 32 0
message: "DescribeConfigs (Version: 0)"
bindings:
app0:
type: kafka
Expand Down

0 comments on commit 137de5e

Please sign in to comment.