From e5bc12cf6163b3e3bd9d9fd2aad821d37627a157 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Onuki?= Date: Wed, 22 Jan 2025 14:23:23 -0500 Subject: [PATCH] Kafka clients node metrics (#2202) --- .../build.gradle | 4 +- .../kafka/CachedKafkaMetric.java | 7 + .../kafka/CachedKafkaMetrics.java | 73 +----- .../nr/instrumentation/kafka/ClientType.java | 23 ++ .../kafka/FiniteMetricRecorder.java | 7 + .../instrumentation/kafka/MetricNameUtil.java | 7 + .../kafka/NewRelicMetricsReporter.java | 36 ++- .../kafka/NodeTopicRegistry.java | 84 +++++++ .../kafka/ThreadFactories.java | 7 + .../KafkaConsumer_Instrumentation.java | 6 +- .../KafkaProducer_Instrumentation.java | 12 +- .../kafka/CachedKafkaMetricsTest.java | 90 +------ .../kafka/FiniteMetricRecorderTest.java | 7 + .../kafka/MetricNameUtilTest.java | 7 + .../kafka/NewRelicMetricsReporterTest.java | 50 +++- .../kafka/NodeTopicRegistryTest.java | 99 ++++++++ .../kafka/ThreadFactoriesTest.java | 7 + .../README.md | 31 +++ .../build.gradle | 24 ++ .../kafka/CachedKafkaMetric.java | 16 ++ .../kafka/CachedKafkaMetrics.java | 171 +++++++++++++ .../nr/instrumentation/kafka/ClientType.java | 23 ++ .../kafka/CumulativeSumSupport.java | 7 + .../kafka/FiniteMetricRecorder.java | 33 +++ .../instrumentation/kafka/MetricNameUtil.java | 113 +++++++++ .../kafka/NewRelicMetricsReporter.java | 133 +++++++++++ .../kafka/NodeTopicRegistry.java | 84 +++++++ .../kafka/ThreadFactories.java | 40 ++++ .../KafkaConsumer_Instrumentation.java | 26 +- .../KafkaProducer_Instrumentation.java | 43 ++++ .../kafka/CachedKafkaMetricsTest.java | 225 ++++++++++++++++++ .../kafka/FiniteMetricRecorderTest.java | 80 +++++++ .../kafka/MetricNameUtilTest.java | 187 +++++++++++++++ .../kafka/NewRelicMetricsReporterTest.java | 169 +++++++++++++ .../kafka/NodeTopicRegistryTest.java | 99 ++++++++ .../kafka/ThreadFactoriesTest.java | 31 +++ .../build.gradle | 2 +- .../kafka/CachedKafkaMetric.java | 7 + .../kafka/CachedKafkaMetrics.java | 7 + .../nr/instrumentation/kafka/ClientType.java | 23 ++ .../kafka/CumulativeSumSupport.java | 7 + .../kafka/FiniteMetricRecorder.java | 7 + .../instrumentation/kafka/MetricNameUtil.java | 7 + .../kafka/NewRelicMetricsReporter.java | 35 ++- .../kafka/NodeTopicRegistry.java | 84 +++++++ .../kafka/ThreadFactories.java | 7 + .../AsyncKafkaConsumer_Instrumentation.java | 51 ++++ .../LegacyKafkaConsumer_Instrumentation.java | 51 ++++ .../KafkaProducer_Instrumentation.java | 13 +- .../kafka/CachedKafkaMetricsTest.java | 7 + .../kafka/FiniteMetricRecorderTest.java | 7 + .../kafka/MetricNameUtilTest.java | 7 + .../kafka/NewRelicMetricsReporterTest.java | 69 ++++-- .../kafka/NodeTopicRegistryTest.java | 99 ++++++++ .../kafka/ThreadFactoriesTest.java | 7 + settings.gradle | 1 + 56 files changed, 2346 insertions(+), 213 deletions(-) create mode 100644 instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/ClientType.java create mode 100644 instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/NodeTopicRegistry.java create mode 100644 instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/NodeTopicRegistryTest.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/README.md create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/build.gradle create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetric.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetrics.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/ClientType.java rename instrumentation/{kafka-clients-node-metrics-1.0.0 => kafka-clients-node-metrics-2.3.0}/src/main/java/com/nr/instrumentation/kafka/CumulativeSumSupport.java (88%) create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/FiniteMetricRecorder.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/MetricNameUtil.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/NodeTopicRegistry.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/ThreadFactories.java rename instrumentation/{kafka-clients-node-metrics-3.7.0 => kafka-clients-node-metrics-2.3.0}/src/main/java/org/apache/kafka/kafka/clients/consumer/KafkaConsumer_Instrumentation.java (67%) create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/org/apache/kafka/kafka/clients/producer/KafkaProducer_Instrumentation.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/CachedKafkaMetricsTest.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/FiniteMetricRecorderTest.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/MetricNameUtilTest.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/NewRelicMetricsReporterTest.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/NodeTopicRegistryTest.java create mode 100644 instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/ThreadFactoriesTest.java create mode 100644 instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/ClientType.java create mode 100644 instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/NodeTopicRegistry.java create mode 100644 instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/consumer/internals/AsyncKafkaConsumer_Instrumentation.java create mode 100644 instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/consumer/internals/LegacyKafkaConsumer_Instrumentation.java create mode 100644 instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/NodeTopicRegistryTest.java diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/build.gradle b/instrumentation/kafka-clients-node-metrics-1.0.0/build.gradle index dfe79a74f1..4e835cf447 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/build.gradle +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/build.gradle @@ -2,7 +2,7 @@ dependencies { implementation(project(":newrelic-api")) implementation(project(":newrelic-weaver-api")) - implementation("org.apache.kafka:kafka-clients:3.3.0") + implementation("org.apache.kafka:kafka-clients:2.2.2") testImplementation("org.awaitility:awaitility:4.2.0") testImplementation("org.mockito:mockito-inline:4.11.0") @@ -14,7 +14,7 @@ jar { } verifyInstrumentation { - passesOnly 'org.apache.kafka:kafka-clients:[1.0.0,3.7.0)' + passesOnly 'org.apache.kafka:kafka-clients:[1.0.0,2.3.0)' } diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetric.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetric.java index d582fd236f..e1484594b8 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetric.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetric.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; interface CachedKafkaMetric { diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetrics.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetrics.java index efbccd93b3..dc581701ce 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetrics.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetrics.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import org.apache.kafka.common.metrics.KafkaMetric; @@ -12,18 +19,6 @@ static CachedKafkaMetric newCachedKafkaMetric(final KafkaMetric metric) { return new CachedKafkaVersion(metric); } - Measurable measurable = null; - try { - measurable = metric.measurable(); - } catch (final IllegalStateException e) { - } - - final boolean isCumulativeSumType = measurable != null && - CumulativeSumSupport.isCumulativeSumClass(measurable.getClass().getName()); - if (isCumulativeSumType) { - return new CachedKafkaCounter(metric); - } - if (!(metric.metricValue() instanceof Number)) { return new InvalidCachedKafkaMetric(metric); } @@ -104,60 +99,6 @@ public void report(final FiniteMetricRecorder recorder) { } } - private static class CachedKafkaCounter implements CachedKafkaMetric { - private final KafkaMetric metric; - private static final Pattern totalPattern = Pattern.compile("-total$"); - - private final String counterMetricName; - private final String totalMetricName; - - private int previous = -1; - - public CachedKafkaCounter(final KafkaMetric metric) { - this.metric = metric; - - totalMetricName = MetricNameUtil.buildMetricName(metric); - - String metricName = metric.metricName().name(); - String counterName = totalPattern.matcher(metricName).replaceAll("-counter"); - if (counterName.equals(metricName)) { - counterName = metricName + "-counter"; - } - counterMetricName = MetricNameUtil.buildMetricName(metric, counterName); - } - - @Override - public boolean isValid() { - return true; - } - - @Override - public String displayName() { - return MetricNameUtil.buildDisplayName(metric); - } - - @Override - public void report(final FiniteMetricRecorder recorder) { - final Number value = ((Number) metric.metricValue()); - if (!recorder.tryRecordMetric(totalMetricName, value.floatValue())) { - // we can't trust the last observed value, so reset - previous = -1; - return; - } - - final int intValue = value.intValue(); - if (previous == -1L) { - previous = intValue; - return; - } - - final int delta = intValue - previous; - previous = intValue; - - recorder.incrementCounter(counterMetricName, delta); - } - } - private CachedKafkaMetrics() { // prevents instantiation } diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/ClientType.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/ClientType.java new file mode 100644 index 0000000000..d93d534379 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/ClientType.java @@ -0,0 +1,23 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +public enum ClientType { + CONSUMER("Consume"), + PRODUCER("Produce"); + + private final String operation; + + ClientType(String operation) { + this.operation = operation; + } + + public String getOperation() { + return operation; + } +} diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/FiniteMetricRecorder.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/FiniteMetricRecorder.java index 8a0daded74..668720dde1 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/FiniteMetricRecorder.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/FiniteMetricRecorder.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import com.newrelic.api.agent.NewRelic; diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/MetricNameUtil.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/MetricNameUtil.java index 64c8196edc..8fe91a7341 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/MetricNameUtil.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/MetricNameUtil.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import org.apache.kafka.common.metrics.KafkaMetric; diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java index 9ad694cd69..570a90514f 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java @@ -1,10 +1,19 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import com.newrelic.api.agent.NewRelic; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricsReporter; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -26,23 +35,22 @@ public class NewRelicMetricsReporter implements MetricsReporter { private final ConcurrentHashMap metrics = new ConcurrentHashMap<>(); private final FiniteMetricRecorder recorder = new FiniteMetricRecorder(); + private final NodeTopicRegistry nodeTopicRegistry; + + public NewRelicMetricsReporter(ClientType clientType, Collection nodes) { + nodeTopicRegistry = new NodeTopicRegistry(clientType, nodes); + } @Override public void init(List metrics) { NewRelic.getAgent().getLogger().log(Level.INFO, - "newrelic-kafka-clients-enhancements: initializing with SUPPORTS_CUMULATIVE_SUM={0}", - CumulativeSumSupport.isCumulativeSumSupported()); + "newrelic-kafka-clients-enhancements: initializing. This version of Kafka does not support cumulative sum."); for (final KafkaMetric metric : metrics) { registerMetric(metric); } - future = SCHEDULER.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - report(); - } - }, 0, REPORTING_INTERVAL_IN_SECONDS, TimeUnit.SECONDS); + future = SCHEDULER.scheduleAtFixedRate(this::report, 0, REPORTING_INTERVAL_IN_SECONDS, TimeUnit.SECONDS); } @Override @@ -52,8 +60,6 @@ public void metricChange(KafkaMetric metric) { @Override public void metricRemoval(KafkaMetric metric) { - metrics.remove(metric.metricName()); - final CachedKafkaMetric cachedMetric = metrics.remove(metric.metricName()); if (cachedMetric != null) { debugLog("newrelic-kafka-clients-enhancements: deregister metric: {0}", cachedMetric.displayName()); @@ -66,6 +72,8 @@ public void close() { future.cancel(false); future = null; } + metrics.clear(); + nodeTopicRegistry.close(); } @Override @@ -80,11 +88,15 @@ private void registerMetric(final KafkaMetric metric) { return; } - if (TOPIC_METRICS_DISABLED && metric.metricName().tags().get("topic") != null) { + String topic = metric.metricName().tags().get("topic"); + if (TOPIC_METRICS_DISABLED && topic != null) { debugLog("newrelic-kafka-clients-enhancements: skipping topic metric registration: {0}", MetricNameUtil.buildDisplayName(metric)); return; } + if (nodeTopicRegistry.register(topic)) { + debugLog("newrelic-kafka-clients-enhancements: register node topic metric for topic: {0}", topic); + } final CachedKafkaMetric cachedMetric = CachedKafkaMetrics.newCachedKafkaMetric(metric); if (cachedMetric.isValid()) { @@ -102,6 +114,8 @@ private void report() { for (final CachedKafkaMetric metric : metrics.values()) { metric.report(recorder); } + + nodeTopicRegistry.report(recorder); } private void debugLog(String message) { diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/NodeTopicRegistry.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/NodeTopicRegistry.java new file mode 100644 index 0000000000..514073adbc --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/NodeTopicRegistry.java @@ -0,0 +1,84 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import org.apache.kafka.common.Node; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class is used to track all the metric names that are related to a specific node: + * + * - MessageBroker/Kafka/Nodes/host:port + * - MessageBroker/Kafka/Nodes/host:port/Consume/topicName + * - MessageBroker/Kafka/Nodes/host:port/Produce/topicName + * + * At initialization time we only have the node and the mode (is this a metrics reporter + * for a Kafka consumer or for a Kafka producer?). + * + * Then, as topics are discovered through the metricChange method, the topic metric names are + * generated. This is the best way we have to get track of the topics since they're not + * available when the KafkaConsumer/KafkaProducer is initialized. + * + * For KafkaConsumer, the SubscriptionState doesn't contain the topics and partitions + * at initialization time because it takes time for the rebalance to happen. + * + * For KafkaProducer, topics are dynamic since a producer could send records to any + * topic and the concept of subscription doesn't exist there. + * + * Alternatively we could get the topics from the records in KafkaProducer.doSend or + * KafkaConsumer.poll, and call NewRelicMetricsReporter.addTopicToNodeMetrics from there. + * This approach would have a small impact in performance, and getting the topics from the + * KafkaMetrics is a good enough solution. + */ +public class NodeTopicRegistry { + private final Set recordedTopics = ConcurrentHashMap.newKeySet(); + private final Set metricNames = ConcurrentHashMap.newKeySet(); + private final Set nodes = new HashSet<>(); + private final ClientType clientType; + + private static final String METRIC_PREFIX = "MessageBroker/Kafka/Nodes/"; + + public NodeTopicRegistry(ClientType clientType, Collection nodes) { + this.clientType = clientType; + for (Node node : nodes) { + String nodeName = node.host() + ":" + node.port(); + this.nodes.add(nodeName); + this.metricNames.add(METRIC_PREFIX + nodeName); + } + } + + /** + * @return true if the topic was registered + */ + public boolean register(String topic) { + if (topic != null && recordedTopics.add(topic)) { + for (String node : nodes) { + String metricName = METRIC_PREFIX + node + "/" + clientType.getOperation() + "/" + topic; + this.metricNames.add(metricName); + } + return true; + } + return false; + } + + public void report(FiniteMetricRecorder recorder) { + for (String topicMetric : metricNames) { + recorder.recordMetric(topicMetric, 1.0f); + } + } + + public void close() { + recordedTopics.clear(); + metricNames.clear(); + nodes.clear(); + } +} diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/ThreadFactories.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/ThreadFactories.java index 857c8fd807..853b985c01 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/ThreadFactories.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/ThreadFactories.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import java.util.concurrent.ThreadFactory; diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/org/apache/kafka/kafka/clients/consumer/KafkaConsumer_Instrumentation.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/org/apache/kafka/kafka/clients/consumer/KafkaConsumer_Instrumentation.java index 9a730c3fdf..41dd58172b 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/org/apache/kafka/kafka/clients/consumer/KafkaConsumer_Instrumentation.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/org/apache/kafka/kafka/clients/consumer/KafkaConsumer_Instrumentation.java @@ -12,7 +12,9 @@ import com.newrelic.api.agent.weaver.Weave; import com.newrelic.api.agent.weaver.WeaveAllConstructors; import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.ClientType; import com.nr.instrumentation.kafka.NewRelicMetricsReporter; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.common.metrics.Metrics; import java.util.logging.Level; @@ -22,7 +24,7 @@ public class KafkaConsumer_Instrumentation { private final Metrics metrics = Weaver.callOriginal(); private final String clientId = Weaver.callOriginal(); - + private final Metadata metadata = Weaver.callOriginal(); // It's possible for constructors to be invoked multiple times (e.g. `C() { C("some default") }` ). // When this happens we don't want to register the metrics reporter multiple times. @NewField @@ -33,7 +35,7 @@ public KafkaConsumer_Instrumentation() { if (!metricsReporterInstalled) { NewRelic.getAgent().getLogger().log(Level.INFO, "newrelic-kafka-clients-enhancements engaged for consumer {0}", clientId); - metrics.addReporter(new NewRelicMetricsReporter()); + metrics.addReporter(new NewRelicMetricsReporter(ClientType.CONSUMER, metadata.fetch().nodes())); metricsReporterInstalled = true; } } diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/org/apache/kafka/kafka/clients/producer/KafkaProducer_Instrumentation.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/org/apache/kafka/kafka/clients/producer/KafkaProducer_Instrumentation.java index 203d59b1e0..e451d9b331 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/org/apache/kafka/kafka/clients/producer/KafkaProducer_Instrumentation.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/org/apache/kafka/kafka/clients/producer/KafkaProducer_Instrumentation.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package org.apache.kafka.kafka.clients.producer; import com.newrelic.api.agent.NewRelic; @@ -5,7 +12,9 @@ import com.newrelic.api.agent.weaver.Weave; import com.newrelic.api.agent.weaver.WeaveAllConstructors; import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.ClientType; import com.nr.instrumentation.kafka.NewRelicMetricsReporter; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.common.metrics.Metrics; import java.util.logging.Level; @@ -15,6 +24,7 @@ public class KafkaProducer_Instrumentation { private final Metrics metrics = Weaver.callOriginal(); private final String clientId = Weaver.callOriginal(); + private final Metadata metadata = Weaver.callOriginal(); // It's possible for constructors to be invoked multiple times (e.g. `C() { C("some default") }` ). // When this happens we don't want to register the metrics reporter multiple times. @@ -26,7 +36,7 @@ public KafkaProducer_Instrumentation() { if (!metricsReporterInstalled) { NewRelic.getAgent().getLogger().log(Level.INFO, "newrelic-kafka-clients-enhancements engaged for producer {0}", clientId); - metrics.addReporter(new NewRelicMetricsReporter()); + metrics.addReporter(new NewRelicMetricsReporter(ClientType.PRODUCER, metadata.fetch().nodes())); metricsReporterInstalled = true; } } diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/CachedKafkaMetricsTest.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/CachedKafkaMetricsTest.java index 2b3898ddef..9fc86b5cbb 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/CachedKafkaMetricsTest.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/CachedKafkaMetricsTest.java @@ -1,9 +1,15 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Value; import org.junit.Test; @@ -17,14 +23,11 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.*; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyFloat; +import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -81,81 +84,6 @@ public void cachedKafkaSummary() { verify(finiteMetricRecorder).recordMetric(eq("MessageBroker/Kafka/Internal/data/summary"), eq(2.0f)); } - @Test - public void cachedKafkaCounter() { - KafkaMetric counterKafkaMetric = createKafkaMetric(KafkaMetricType.COUNTER); - - CachedKafkaMetric cachedKafkaMetric = CachedKafkaMetrics.newCachedKafkaMetric(counterKafkaMetric); - - assertThat(cachedKafkaMetric.getClass().getName(), - equalTo("com.nr.instrumentation.kafka.CachedKafkaMetrics$CachedKafkaCounter")); - assertThat(cachedKafkaMetric.isValid(), is(true)); - assertThat(cachedKafkaMetric.displayName(), - equalTo("data/something {}")); - - when(finiteMetricRecorder.tryRecordMetric(any(), anyFloat())) - .thenReturn(true); - - cachedKafkaMetric.report(finiteMetricRecorder); - verify(finiteMetricRecorder).tryRecordMetric(eq("MessageBroker/Kafka/Internal/data/something"), eq(3.0f)); - verifyNoMoreInteractions(finiteMetricRecorder); - - cachedKafkaMetric.report(finiteMetricRecorder); - verify(finiteMetricRecorder).tryRecordMetric(eq("MessageBroker/Kafka/Internal/data/something"), eq(4.0f)); - verify(finiteMetricRecorder).incrementCounter(eq("MessageBroker/Kafka/Internal/data/something-counter"), eq(1)); - } - - @Test - public void cachedKafkaCounterTotal() { - KafkaMetric counterKafkaMetric = createKafkaMetric(KafkaMetricType.COUNTER_TOTAL); - - CachedKafkaMetric cachedKafkaMetric = CachedKafkaMetrics.newCachedKafkaMetric(counterKafkaMetric); - - assertThat(cachedKafkaMetric.getClass().getName(), - equalTo("com.nr.instrumentation.kafka.CachedKafkaMetrics$CachedKafkaCounter")); - assertThat(cachedKafkaMetric.isValid(), is(true)); - assertThat(cachedKafkaMetric.displayName(), - equalTo("data/something-total {}")); - - when(finiteMetricRecorder.tryRecordMetric(any(), anyFloat())) - .thenReturn(true); - - cachedKafkaMetric.report(finiteMetricRecorder); - verify(finiteMetricRecorder).tryRecordMetric(eq("MessageBroker/Kafka/Internal/data/something-total"), eq(4.0f)); - verifyNoMoreInteractions(finiteMetricRecorder); - - cachedKafkaMetric.report(finiteMetricRecorder); - verify(finiteMetricRecorder).tryRecordMetric(eq("MessageBroker/Kafka/Internal/data/something-total"), eq(5.0f)); - verify(finiteMetricRecorder).incrementCounter(eq("MessageBroker/Kafka/Internal/data/something-counter"), eq(1)); - } - - @Test - public void cachedKafkaCounterTotalCantTrustValue() { - KafkaMetric counterKafkaMetric = createKafkaMetric(KafkaMetricType.COUNTER_TOTAL); - - CachedKafkaMetric cachedKafkaMetric = CachedKafkaMetrics.newCachedKafkaMetric(counterKafkaMetric); - - assertThat(cachedKafkaMetric.getClass().getName(), - equalTo("com.nr.instrumentation.kafka.CachedKafkaMetrics$CachedKafkaCounter")); - assertThat(cachedKafkaMetric.isValid(), is(true)); - assertThat(cachedKafkaMetric.displayName(), - equalTo("data/something-total {}")); - - // when this method returns false, it means that the value was not recorded - // and thus, the increaseCount will not be called. - when(finiteMetricRecorder.tryRecordMetric(any(), anyFloat())) - .thenReturn(false); - - cachedKafkaMetric.report(finiteMetricRecorder); - verify(finiteMetricRecorder).tryRecordMetric(eq("MessageBroker/Kafka/Internal/data/something-total"), eq(4.0f)); - verifyNoMoreInteractions(finiteMetricRecorder); - - cachedKafkaMetric.report(finiteMetricRecorder); - verify(finiteMetricRecorder).tryRecordMetric(eq("MessageBroker/Kafka/Internal/data/something-total"), eq(5.0f)); - verifyNoMoreInteractions(finiteMetricRecorder); - } - - @Test public void cachedKafkaWithoutMeasurable() { KafkaMetric counterKafkaMetric = createKafkaMetric(KafkaMetricType.WITHOUT_MEASURABLE); @@ -198,8 +126,6 @@ private enum KafkaMetricType { VERSION("app-info", "version", new Value(), 42), INVALID("data", "invalid", new Max(), "towel"), SUMMARY("data", "summary", new Avg(), 2.0f), - COUNTER("data", "something", new CumulativeSum(), 3, 4), - COUNTER_TOTAL("data", "something-total", new CumulativeSum(), 4, 5), WITHOUT_MEASURABLE("data", "unmeasurable", null, 6), ; diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/FiniteMetricRecorderTest.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/FiniteMetricRecorderTest.java index be7d5f86f2..78fc8fc43d 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/FiniteMetricRecorderTest.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/FiniteMetricRecorderTest.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import com.newrelic.api.agent.NewRelic; diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/MetricNameUtilTest.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/MetricNameUtilTest.java index 35a3e1b30d..6d9e1a0d9a 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/MetricNameUtilTest.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/MetricNameUtilTest.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import org.apache.kafka.common.metrics.KafkaMetric; diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/NewRelicMetricsReporterTest.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/NewRelicMetricsReporterTest.java index 05367b3cc5..e157b13f2a 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/NewRelicMetricsReporterTest.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/NewRelicMetricsReporterTest.java @@ -1,9 +1,17 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import com.newrelic.agent.introspec.InstrumentationTestConfig; import com.newrelic.agent.introspec.InstrumentationTestRunner; import com.newrelic.agent.introspec.Introspector; import com.newrelic.agent.introspec.TracedMetricData; +import org.apache.kafka.common.Node; import org.apache.kafka.common.metrics.KafkaMetric; import org.junit.Before; import org.junit.Test; @@ -63,7 +71,9 @@ public void laterLoad() throws Exception { NewRelicMetricsReporter reporter = initMetricsReporter(Collections.emptyList(), otherMetrics); Map unscopedMetrics = introspector.getUnscopedMetrics(); - assertEquals(0, unscopedMetrics.size()); + assertEquals(1, unscopedMetrics.size()); + TracedMetricData nodeMetric = unscopedMetrics.get("MessageBroker/Kafka/Nodes/localhost:42"); + assertEquals(1.0f, nodeMetric.getTotalTimeInSec(), 0.1f); forceHarvest(reporter); @@ -78,27 +88,55 @@ public void laterLoad() throws Exception { @Test public void removeMetric() throws Exception { - List otherMetrics = Arrays.asList(METRIC1, METRIC2); + List initialMetrics = Arrays.asList(METRIC1, METRIC2); - NewRelicMetricsReporter reporter = initMetricsReporter(otherMetrics, Collections.emptyList()); + NewRelicMetricsReporter reporter = initMetricsReporter(initialMetrics, Collections.emptyList()); Map unscopedMetrics = introspector.getUnscopedMetrics(); - assertEquals(2, unscopedMetrics.size()); + assertEquals(3, unscopedMetrics.size()); // metric1, metric2 and node metric introspector.clear(); reporter.metricRemoval(METRIC2); forceHarvest(reporter); unscopedMetrics = introspector.getUnscopedMetrics(); - assertEquals(1, unscopedMetrics.size()); + assertEquals(2, unscopedMetrics.size()); TracedMetricData metric1 = unscopedMetrics.get("MessageBroker/Kafka/Internal/group/metric1"); assertEquals(42.0f, metric1.getTotalTimeInSec(), 0.1f); + TracedMetricData nodeMetric = unscopedMetrics.get("MessageBroker/Kafka/Nodes/localhost:42"); + assertEquals(1.0f, nodeMetric.getTotalTimeInSec(), 0.1f); + + reporter.close(); + } + + @Test + public void nodeTopicMetrics() throws InterruptedException{ + KafkaMetric metricWithTopic = getMetricMock("topicMetric", 20.0f); + when(metricWithTopic.metricName().tags().get("topic")).thenReturn("hhgg"); + List initialMetrics = Arrays.asList(metricWithTopic); + + NewRelicMetricsReporter reporter = initMetricsReporter(initialMetrics, Collections.emptyList()); + + Map unscopedMetrics = introspector.getUnscopedMetrics(); + assertEquals(3, unscopedMetrics.size()); + + TracedMetricData metric1 = unscopedMetrics.get("MessageBroker/Kafka/Internal/group/topicMetric"); + assertEquals(20.0f, metric1.getTotalTimeInSec(), 0.1f); + TracedMetricData nodeMetric = unscopedMetrics.get("MessageBroker/Kafka/Nodes/localhost:42"); + assertEquals(1.0f, nodeMetric.getTotalTimeInSec(), 0.1f); + TracedMetricData nodeTopicMetric = unscopedMetrics.get("MessageBroker/Kafka/Nodes/localhost:42/Consume/hhgg"); + assertEquals(1.0f, nodeTopicMetric.getTotalTimeInSec(), 0.1f); reporter.close(); } protected static NewRelicMetricsReporter initMetricsReporter(List initMetrics, Collection otherMetrics) throws InterruptedException { - NewRelicMetricsReporter metricsReporter = new NewRelicMetricsReporter(); + Node node = mock(Node.class); + when(node.host()) + .thenReturn("localhost"); + when(node.port()) + .thenReturn(42); + NewRelicMetricsReporter metricsReporter = new NewRelicMetricsReporter(ClientType.CONSUMER, Collections.singleton(node)); metricsReporter.init(initMetrics); // init triggers the first harvest that happens in a different thread. Sleeping to let it finish. Thread.sleep(100L); diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/NodeTopicRegistryTest.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/NodeTopicRegistryTest.java new file mode 100644 index 0000000000..d0f39cf9f4 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/NodeTopicRegistryTest.java @@ -0,0 +1,99 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import org.apache.kafka.common.Node; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class NodeTopicRegistryTest { + + @Test + public void singleNodeTest() { + NodeTopicRegistry nodeTopicRegistry = new NodeTopicRegistry(ClientType.CONSUMER, Collections.singleton(getNode("ad"))); + assertTrue(nodeTopicRegistry.register("tm")); + assertFalse(nodeTopicRegistry.register("tm")); + assertTrue(nodeTopicRegistry.register("fp")); + assertTrue(nodeTopicRegistry.register("zb")); + + FiniteMetricRecorder finiteMetricRecorder = mock(FiniteMetricRecorder.class); + nodeTopicRegistry.report(finiteMetricRecorder); + + verifyMetrics(finiteMetricRecorder, + "MessageBroker/Kafka/Nodes/ad:42/Consume/tm", + "MessageBroker/Kafka/Nodes/ad:42/Consume/fp", + "MessageBroker/Kafka/Nodes/ad:42/Consume/zb", + "MessageBroker/Kafka/Nodes/ad:42" + ); + + + // verify nothing is reported after close + clearInvocations(finiteMetricRecorder); + + nodeTopicRegistry.close(); + nodeTopicRegistry.report(finiteMetricRecorder); + + verifyNoInteractions(finiteMetricRecorder); + } + + @Test + public void multiNodeTest() { + NodeTopicRegistry nodeTopicRegistry = new NodeTopicRegistry(ClientType.PRODUCER, Arrays.asList(getNode("hh"), getNode("gg"))); + assertTrue(nodeTopicRegistry.register("vp")); + assertFalse(nodeTopicRegistry.register("vp")); + assertTrue(nodeTopicRegistry.register("sep")); + + FiniteMetricRecorder finiteMetricRecorder = mock(FiniteMetricRecorder.class); + nodeTopicRegistry.report(finiteMetricRecorder); + + verifyMetrics(finiteMetricRecorder, + "MessageBroker/Kafka/Nodes/hh:42/Produce/vp", + "MessageBroker/Kafka/Nodes/hh:42/Produce/sep", + "MessageBroker/Kafka/Nodes/hh:42", + "MessageBroker/Kafka/Nodes/gg:42/Produce/vp", + "MessageBroker/Kafka/Nodes/gg:42/Produce/sep", + "MessageBroker/Kafka/Nodes/gg:42" + ); + + + // verify nothing is reported after close + clearInvocations(finiteMetricRecorder); + + nodeTopicRegistry.close(); + nodeTopicRegistry.report(finiteMetricRecorder); + + verifyNoInteractions(finiteMetricRecorder); + } + + private Node getNode(String host) { + Node node = Mockito.mock(Node.class); + when(node.host()).thenReturn(host); + when(node.port()).thenReturn(42); + return node; + } + + private void verifyMetrics(FiniteMetricRecorder metricRecorderMock, String ... metrics) { + for (String metric : metrics) { + verify(metricRecorderMock).recordMetric(eq(metric), eq(1.0f)); + } + verifyNoMoreInteractions(metricRecorderMock); + } +} \ No newline at end of file diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/ThreadFactoriesTest.java b/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/ThreadFactoriesTest.java index 2300b72a11..ff7865049e 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/ThreadFactoriesTest.java +++ b/instrumentation/kafka-clients-node-metrics-1.0.0/src/test/java/com/nr/instrumentation/kafka/ThreadFactoriesTest.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import org.junit.Test; diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/README.md b/instrumentation/kafka-clients-node-metrics-2.3.0/README.md new file mode 100644 index 0000000000..38650401d4 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/README.md @@ -0,0 +1,31 @@ +# Kafka Clients Node Metrics instrumentation + +This instrumentation is a replacement for the existing `kafka-clients-metrics` instrumentation. +It has a smaller scope as it just reads from KafkaMetrics and does not provide other functionality present in `kafka-client-metrics` instrumentation. + +## Instrumentation + +Whenever a Consumer or Producer is instantiated this instrumentation adds a MetricsReporter to the respective Metrics object. +This MetricsReporter will then be provided all the metrics related to that Consumer/Producer as well as some metrics related to the Kafka infrastructure. +Every period the MetricsReporter will then iterate thru all the metrics, convert into agent metrics, and then queue those metrics for sending. + +This functionality is very similar to the metrics functionality of the `kafka-clients-metrics` instrumentation. The difference is that this instrumentation module will read the tags in the metric and add details about the node and the client to the metrics sent to New Relic. + +## Configuration + +Option | Default | Description +-------------------------------------|---------|-------------------------------------------- +kafka.metrics.debug.enabled | false | Whether to log debug information. +kafka.metrics.node.metrics.disabled | false | Whether to send list of node metrics. +kafka.metrics.topic.metrics.disabled | false | Whether to send list of node/topic metrics. +kafka.metrics.interval | 30 | Number of seconds between metrics reports. + +## Comparison to kafka-client-metrics + +The `kafka-clients-metrics` instrumentation has the following functionality which is not present in this instrumentation module: +- metrics about serialization and deserialization; +- tracing `KafkaProducer.doSend`; +- reporting `KafkaProducer.doSend` and `KafkaConsumer.poll` as externals; +- noticing errors when `KafkaProducer.doSend` or `KafkaConsumer.poll` throws an exception; +- rebalancing metrics; +- metrics that list the nodes and topics consumed/produce by each. \ No newline at end of file diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/build.gradle b/instrumentation/kafka-clients-node-metrics-2.3.0/build.gradle new file mode 100644 index 0000000000..85c23079d3 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/build.gradle @@ -0,0 +1,24 @@ + +dependencies { + implementation(project(":newrelic-api")) + implementation(project(":newrelic-weaver-api")) + implementation("org.apache.kafka:kafka-clients:3.6.2") + + testImplementation("org.awaitility:awaitility:4.2.0") + testImplementation("org.mockito:mockito-inline:4.11.0") +} + +jar { + manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.kafka-clients-node-metrics-2.3.0', 'Enabled': 'false' , + 'Implementation-Title-Alias': 'kafka-clients-node-metrics' } +} + +verifyInstrumentation { + passesOnly 'org.apache.kafka:kafka-clients:[2.3.0,3.7.0)' +} + + +site { + title 'Kafka' + type 'Messaging' +} diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetric.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetric.java new file mode 100644 index 0000000000..e1484594b8 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetric.java @@ -0,0 +1,16 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +interface CachedKafkaMetric { + boolean isValid(); + + String displayName(); + + void report(final FiniteMetricRecorder recorder); +} diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetrics.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetrics.java new file mode 100644 index 0000000000..8b506b7a6c --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetrics.java @@ -0,0 +1,171 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; + +import java.util.regex.Pattern; + +class CachedKafkaMetrics { + + static CachedKafkaMetric newCachedKafkaMetric(final KafkaMetric metric) { + if ("app-info".equals(metric.metricName().group()) && "version".equals(metric.metricName().name())) { + return new CachedKafkaVersion(metric); + } + + Measurable measurable = null; + try { + measurable = metric.measurable(); + } catch (final IllegalStateException e) { + } + + final boolean isCumulativeSumType = measurable != null && + CumulativeSumSupport.isCumulativeSumClass(measurable.getClass().getName()); + if (isCumulativeSumType) { + return new CachedKafkaCounter(metric); + } + + if (!(metric.metricValue() instanceof Number)) { + return new InvalidCachedKafkaMetric(metric); + } + + return new CachedKafkaSummary(metric); + } + + private static class CachedKafkaVersion implements CachedKafkaMetric { + private final KafkaMetric metric; + private final String newRelicMetricName; + + public CachedKafkaVersion(final KafkaMetric metric) { + this.metric = metric; + this.newRelicMetricName = MetricNameUtil.METRIC_PREFIX + "app-info/version/" + metric.metricValue(); + } + + @Override + public boolean isValid() { + return true; + } + + @Override + public String displayName() { + return "app-info/version/" + metric.metricValue(); + } + + @Override + public void report(final FiniteMetricRecorder recorder) { + recorder.recordMetric(newRelicMetricName, 1.0f); + } + } + + private static class InvalidCachedKafkaMetric implements CachedKafkaMetric { + private final KafkaMetric metric; + + public InvalidCachedKafkaMetric(final KafkaMetric metric) { + this.metric = metric; + } + + @Override + public boolean isValid() { + return false; + } + + @Override + public String displayName() { + return MetricNameUtil.buildDisplayName(metric); + } + + @Override + public void report(FiniteMetricRecorder recorder) { + // no-op + } + } + + private static class CachedKafkaSummary implements CachedKafkaMetric { + private final KafkaMetric metric; + private final String newRelicMetricName; + + public CachedKafkaSummary(final KafkaMetric metric) { + this.metric = metric; + this.newRelicMetricName = MetricNameUtil.buildMetricName(metric); + } + + @Override + public boolean isValid() { + return true; + } + + @Override + public String displayName() { + return MetricNameUtil.buildDisplayName(metric); + } + + @Override + public void report(final FiniteMetricRecorder recorder) { + recorder.recordMetric(newRelicMetricName, ((Number) metric.metricValue()).floatValue()); + } + } + + private static class CachedKafkaCounter implements CachedKafkaMetric { + private final KafkaMetric metric; + private static final Pattern totalPattern = Pattern.compile("-total$"); + + private final String counterMetricName; + private final String totalMetricName; + + private int previous = -1; + + public CachedKafkaCounter(final KafkaMetric metric) { + this.metric = metric; + + totalMetricName = MetricNameUtil.buildMetricName(metric); + + String metricName = metric.metricName().name(); + String counterName = totalPattern.matcher(metricName).replaceAll("-counter"); + if (counterName.equals(metricName)) { + counterName = metricName + "-counter"; + } + counterMetricName = MetricNameUtil.buildMetricName(metric, counterName); + } + + @Override + public boolean isValid() { + return true; + } + + @Override + public String displayName() { + return MetricNameUtil.buildDisplayName(metric); + } + + @Override + public void report(final FiniteMetricRecorder recorder) { + final Number value = ((Number) metric.metricValue()); + if (!recorder.tryRecordMetric(totalMetricName, value.floatValue())) { + // we can't trust the last observed value, so reset + previous = -1; + return; + } + + final int intValue = value.intValue(); + if (previous == -1L) { + previous = intValue; + return; + } + + final int delta = intValue - previous; + previous = intValue; + + recorder.incrementCounter(counterMetricName, delta); + } + } + + private CachedKafkaMetrics() { + // prevents instantiation + } +} diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/ClientType.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/ClientType.java new file mode 100644 index 0000000000..d93d534379 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/ClientType.java @@ -0,0 +1,23 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +public enum ClientType { + CONSUMER("Consume"), + PRODUCER("Produce"); + + private final String operation; + + ClientType(String operation) { + this.operation = operation; + } + + public String getOperation() { + return operation; + } +} diff --git a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/CumulativeSumSupport.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/CumulativeSumSupport.java similarity index 88% rename from instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/CumulativeSumSupport.java rename to instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/CumulativeSumSupport.java index 7dc226c077..885379ee26 100644 --- a/instrumentation/kafka-clients-node-metrics-1.0.0/src/main/java/com/nr/instrumentation/kafka/CumulativeSumSupport.java +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/CumulativeSumSupport.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; public class CumulativeSumSupport { diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/FiniteMetricRecorder.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/FiniteMetricRecorder.java new file mode 100644 index 0000000000..668720dde1 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/FiniteMetricRecorder.java @@ -0,0 +1,33 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import com.newrelic.api.agent.NewRelic; + +class FiniteMetricRecorder { + public void incrementCounter(final String metric, final int value) { + NewRelic.incrementCounter(metric, value); + } + + public boolean tryRecordMetric(final String metric, final float value) { + if (Float.isNaN(value) || Float.isInfinite(value)) { + return false; + } + + NewRelic.recordMetric(metric, value); + return true; + } + + public void recordMetric(final String metric, final float value) { + if (Float.isNaN(value) || Float.isInfinite(value)) { + return; + } + + NewRelic.recordMetric(metric, value); + } +} diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/MetricNameUtil.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/MetricNameUtil.java new file mode 100644 index 0000000000..8fe91a7341 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/MetricNameUtil.java @@ -0,0 +1,113 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import org.apache.kafka.common.metrics.KafkaMetric; + +public class MetricNameUtil { + static final String METRIC_PREFIX = "MessageBroker/Kafka/Internal/"; + + static String buildDisplayName(final KafkaMetric metric) { + return String.format("%s/%s %s", metric.metricName().group(), metric.metricName().name(), metric.metricName().tags()); + } + + static String buildMetricName(final KafkaMetric metric) { + return buildMetricName(metric, null); + } + + static String buildMetricName(final KafkaMetric metric, final String nameOverride) { + final String name = nameOverride != null ? nameOverride : metric.metricName().name(); + final String metricPrefix = METRIC_PREFIX + metric.metricName().group() + "/"; + + final String clientId = metric.metricName().tags().get("client-id"); + if (clientId == null) { + return metricPrefix + name; + } + + // is it a per-topic metric? + final String topic = metric.metricName().tags().get("topic"); + if (topic != null) { + return metricPrefix + "topic/" + topic + "/client/" + clientId + "/" + name; + } + + // is it a per-node metric? + String nodeId = metric.metricName().tags().get("node-id"); + if (nodeId != null) { + nodeId = normalizeNodeId(nodeId); + return metricPrefix + "node/" + nodeId + "/client/" + clientId + "/" + name; + } + + return metricPrefix + "client/" + clientId + "/" + name; + } + + private static String normalizeNodeId(final String nodeId) { + // + // sometimes node IDs get weird. let's try to clean things up a bit. + // + + final String[] parts = nodeId.split("-", 2); + if (parts.length != 2) { + // + // a strange node ID that doesn't conform to the expected pattern. leave it be. + // + return nodeId; + } + + final int num; + try { + num = Integer.parseInt(parts[1]); + } catch (final NumberFormatException e) { + // + // non-numeric value in the node ID. weird, but OK. + // + return nodeId; + } + + // + // negative node IDs are used for seed brokers (i.e. initial metadata bootstrap) + // the negative values are pretty useless in practice and just act as placeholders + // for the metadata request. once the metadata request is complete we know the real + // broker IDs and things get more interesting. + // + // return "seed" for negative node IDs since it's probably more useful to users + // than some confusing pseudo-ID. + // + if (num < 0) { + return "seed"; + } + + // + // try to detect coordinator node IDs. what is this nonsense? I'm so glad you asked. + // + // group coordinator node IDs get munged in order to separate the coordinator + // "control plane" from the data plane. this is achieved by subtracting the + // true node ID from Integer.MAX_VALUE. here we just unmunge the node ID to + // get the true ID of the group coordinator to report something more useful + // to users. + // + // note there's no "guaranteed" way to avoid conflicts across the node ID + // "namespace" so we can't actually tell the difference between a coordinator + // node ID and a "regular" node ID, but here we assume that node IDs aren't + // typically huge (in practice I believe they're limited to fairly small but + // configurable values on the broker side anyway) + // + final int coordinatorNodeId = Integer.MAX_VALUE - num; + if (coordinatorNodeId > 0 && coordinatorNodeId < (Integer.MAX_VALUE & 0xff000000)){ + return "coordinator-" + coordinatorNodeId; + } + + // + // fall back to the unmodified node ID that was passed in (this should be the typical case) + // + return nodeId; + } + + private MetricNameUtil() { + // prevents instantiation + } +} diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java new file mode 100644 index 0000000000..956fd1f476 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java @@ -0,0 +1,133 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import com.newrelic.api.agent.NewRelic; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +public class NewRelicMetricsReporter implements MetricsReporter { + + private static final boolean METRICS_DEBUG = NewRelic.getAgent().getConfig().getValue("kafka.metrics.debug.enabled", false); + private static final boolean NODE_METRICS_DISABLED = NewRelic.getAgent().getConfig().getValue("kafka.metrics.node.metrics.disabled", false); + private static final boolean TOPIC_METRICS_DISABLED = NewRelic.getAgent().getConfig().getValue("kafka.metrics.topic.metrics.disabled", false); + private static final long REPORTING_INTERVAL_IN_SECONDS = NewRelic.getAgent().getConfig().getValue("kafka.metrics.interval", 30); + private static final ScheduledExecutorService SCHEDULER = + Executors.newSingleThreadScheduledExecutor(ThreadFactories.build("NewRelicMetricsReporter-Kafka")); + private ScheduledFuture future; + + private final ConcurrentHashMap metrics = new ConcurrentHashMap<>(); + private final FiniteMetricRecorder recorder = new FiniteMetricRecorder(); + private final NodeTopicRegistry nodeTopicRegistry; + + public NewRelicMetricsReporter(ClientType clientType, Collection nodes) { + nodeTopicRegistry = new NodeTopicRegistry(clientType, nodes); + } + + @Override + public void init(List metrics) { + NewRelic.getAgent().getLogger().log(Level.INFO, + "newrelic-kafka-clients-enhancements: initializing with SUPPORTS_CUMULATIVE_SUM={0}", + CumulativeSumSupport.isCumulativeSumSupported()); + + for (final KafkaMetric metric : metrics) { + registerMetric(metric); + } + + future = SCHEDULER.scheduleAtFixedRate(this::report, 0, REPORTING_INTERVAL_IN_SECONDS, TimeUnit.SECONDS); + } + + @Override + public void metricChange(KafkaMetric metric) { + registerMetric(metric); + } + + @Override + public void metricRemoval(KafkaMetric metric) { + final CachedKafkaMetric cachedMetric = metrics.remove(metric.metricName()); + if (cachedMetric != null) { + debugLog("newrelic-kafka-clients-enhancements: deregister metric: {0}", cachedMetric.displayName()); + } + } + + @Override + public void close() { + if (future != null) { + future.cancel(false); + future = null; + } + metrics.clear(); + nodeTopicRegistry.close(); + } + + @Override + public void configure(Map configs) { + + } + + private void registerMetric(final KafkaMetric metric) { + if (NODE_METRICS_DISABLED && metric.metricName().tags().get("node-id") != null) { + debugLog("newrelic-kafka-clients-enhancements: skipping node metric registration: {0}", + MetricNameUtil.buildDisplayName(metric)); + return; + } + + String topic = metric.metricName().tags().get("topic"); + if (TOPIC_METRICS_DISABLED && topic != null) { + debugLog("newrelic-kafka-clients-enhancements: skipping topic metric registration: {0}", + MetricNameUtil.buildDisplayName(metric)); + return; + } + if (nodeTopicRegistry.register(topic)) { + debugLog("newrelic-kafka-clients-enhancements: register node topic metric for topic: {0}", topic); + } + + final CachedKafkaMetric cachedMetric = CachedKafkaMetrics.newCachedKafkaMetric(metric); + if (cachedMetric.isValid()) { + debugLog("newrelic-kafka-clients-enhancements: register metric: {0}", cachedMetric.displayName()); + + this.metrics.put(metric.metricName(), cachedMetric); + } else { + debugLog("newrelic-kafka-clients-enhancements: skipping metric registration: {0}", cachedMetric.displayName()); + } + } + + private void report() { + debugLog("newrelic-kafka-clients-enhancements: reporting Kafka metrics"); + + for (final CachedKafkaMetric metric : metrics.values()) { + metric.report(recorder); + } + + nodeTopicRegistry.report(recorder); + } + + private void debugLog(String message) { + if (METRICS_DEBUG) { + NewRelic.getAgent().getLogger().log(Level.INFO, message); + } + } + + private void debugLog(String message, Object value) { + if (METRICS_DEBUG) { + NewRelic.getAgent().getLogger().log(Level.INFO, message, value); + } + } +} diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/NodeTopicRegistry.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/NodeTopicRegistry.java new file mode 100644 index 0000000000..514073adbc --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/NodeTopicRegistry.java @@ -0,0 +1,84 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import org.apache.kafka.common.Node; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class is used to track all the metric names that are related to a specific node: + * + * - MessageBroker/Kafka/Nodes/host:port + * - MessageBroker/Kafka/Nodes/host:port/Consume/topicName + * - MessageBroker/Kafka/Nodes/host:port/Produce/topicName + * + * At initialization time we only have the node and the mode (is this a metrics reporter + * for a Kafka consumer or for a Kafka producer?). + * + * Then, as topics are discovered through the metricChange method, the topic metric names are + * generated. This is the best way we have to get track of the topics since they're not + * available when the KafkaConsumer/KafkaProducer is initialized. + * + * For KafkaConsumer, the SubscriptionState doesn't contain the topics and partitions + * at initialization time because it takes time for the rebalance to happen. + * + * For KafkaProducer, topics are dynamic since a producer could send records to any + * topic and the concept of subscription doesn't exist there. + * + * Alternatively we could get the topics from the records in KafkaProducer.doSend or + * KafkaConsumer.poll, and call NewRelicMetricsReporter.addTopicToNodeMetrics from there. + * This approach would have a small impact in performance, and getting the topics from the + * KafkaMetrics is a good enough solution. + */ +public class NodeTopicRegistry { + private final Set recordedTopics = ConcurrentHashMap.newKeySet(); + private final Set metricNames = ConcurrentHashMap.newKeySet(); + private final Set nodes = new HashSet<>(); + private final ClientType clientType; + + private static final String METRIC_PREFIX = "MessageBroker/Kafka/Nodes/"; + + public NodeTopicRegistry(ClientType clientType, Collection nodes) { + this.clientType = clientType; + for (Node node : nodes) { + String nodeName = node.host() + ":" + node.port(); + this.nodes.add(nodeName); + this.metricNames.add(METRIC_PREFIX + nodeName); + } + } + + /** + * @return true if the topic was registered + */ + public boolean register(String topic) { + if (topic != null && recordedTopics.add(topic)) { + for (String node : nodes) { + String metricName = METRIC_PREFIX + node + "/" + clientType.getOperation() + "/" + topic; + this.metricNames.add(metricName); + } + return true; + } + return false; + } + + public void report(FiniteMetricRecorder recorder) { + for (String topicMetric : metricNames) { + recorder.recordMetric(topicMetric, 1.0f); + } + } + + public void close() { + recordedTopics.clear(); + metricNames.clear(); + nodes.clear(); + } +} diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/ThreadFactories.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/ThreadFactories.java new file mode 100644 index 0000000000..853b985c01 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/com/nr/instrumentation/kafka/ThreadFactories.java @@ -0,0 +1,40 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class ThreadFactories { + public static ThreadFactory build(final String serviceName) { + return new DefaultThreadFactory(serviceName, true); + } + + private ThreadFactories() { + // prevents instantiation + } + + private static class DefaultThreadFactory implements ThreadFactory { + private final String serviceName; + private final AtomicInteger counter; + private final boolean daemon; + + private DefaultThreadFactory(String serviceName, boolean daemon) { + this.serviceName = serviceName; + counter = new AtomicInteger(0); + this.daemon = daemon; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "New Relic " + serviceName + " #" + counter.incrementAndGet()); + thread.setDaemon(daemon); + return thread; + } + } +} diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/consumer/KafkaConsumer_Instrumentation.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/org/apache/kafka/kafka/clients/consumer/KafkaConsumer_Instrumentation.java similarity index 67% rename from instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/consumer/KafkaConsumer_Instrumentation.java rename to instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/org/apache/kafka/kafka/clients/consumer/KafkaConsumer_Instrumentation.java index e5265d868d..73d01f5218 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/consumer/KafkaConsumer_Instrumentation.java +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/org/apache/kafka/kafka/clients/consumer/KafkaConsumer_Instrumentation.java @@ -11,20 +11,20 @@ import com.newrelic.api.agent.weaver.NewField; import com.newrelic.api.agent.weaver.Weave; import com.newrelic.api.agent.weaver.WeaveAllConstructors; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.ClientType; import com.nr.instrumentation.kafka.NewRelicMetricsReporter; +import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; +import org.apache.kafka.common.metrics.Metrics; -import java.time.Duration; -import java.util.Map; import java.util.logging.Level; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.metrics.Metrics; - @Weave(originalName = "org.apache.kafka.clients.consumer.KafkaConsumer") -public abstract class KafkaConsumer_Instrumentation { +public class KafkaConsumer_Instrumentation { + private final Metrics metrics = Weaver.callOriginal(); + private final String clientId = Weaver.callOriginal(); + private final ConsumerMetadata metadata = Weaver.callOriginal(); // It's possible for constructors to be invoked multiple times (e.g. `C() { C("some default") }` ). // When this happens we don't want to register the metrics reporter multiple times. @NewField @@ -32,19 +32,11 @@ public abstract class KafkaConsumer_Instrumentation { @WeaveAllConstructors public KafkaConsumer_Instrumentation() { - - String clientId = clientInstanceId(Duration.ofSeconds(1)).toString(); - Metrics metrics = (Metrics) metrics(); - if (!metricsReporterInstalled) { NewRelic.getAgent().getLogger().log(Level.INFO, "newrelic-kafka-clients-enhancements engaged for consumer {0}", clientId); - metrics.addReporter(new NewRelicMetricsReporter()); + metrics.addReporter(new NewRelicMetricsReporter(ClientType.CONSUMER, metadata.fetch().nodes())); metricsReporterInstalled = true; } } - - public abstract Uuid clientInstanceId(Duration timeout); - - public abstract Map metrics(); } diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/org/apache/kafka/kafka/clients/producer/KafkaProducer_Instrumentation.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/org/apache/kafka/kafka/clients/producer/KafkaProducer_Instrumentation.java new file mode 100644 index 0000000000..a22972d4fb --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/main/java/org/apache/kafka/kafka/clients/producer/KafkaProducer_Instrumentation.java @@ -0,0 +1,43 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.apache.kafka.kafka.clients.producer; + +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.weaver.NewField; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.WeaveAllConstructors; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.ClientType; +import com.nr.instrumentation.kafka.NewRelicMetricsReporter; +import org.apache.kafka.clients.producer.internals.ProducerMetadata; +import org.apache.kafka.common.metrics.Metrics; + +import java.util.logging.Level; + +@Weave(originalName = "org.apache.kafka.clients.producer.KafkaProducer") +public class KafkaProducer_Instrumentation { + + private final Metrics metrics = Weaver.callOriginal(); + private final String clientId = Weaver.callOriginal(); + private final ProducerMetadata metadata = Weaver.callOriginal(); + + // It's possible for constructors to be invoked multiple times (e.g. `C() { C("some default") }` ). + // When this happens we don't want to register the metrics reporter multiple times. + @NewField + private boolean metricsReporterInstalled; + + @WeaveAllConstructors + public KafkaProducer_Instrumentation() { + if (!metricsReporterInstalled) { + NewRelic.getAgent().getLogger().log(Level.INFO, + "newrelic-kafka-clients-enhancements engaged for producer {0}", clientId); + metrics.addReporter(new NewRelicMetricsReporter(ClientType.PRODUCER, metadata.fetch().nodes())); + metricsReporterInstalled = true; + } + } +} diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/CachedKafkaMetricsTest.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/CachedKafkaMetricsTest.java new file mode 100644 index 0000000000..a5a2f6728f --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/CachedKafkaMetricsTest.java @@ -0,0 +1,225 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Value; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.OngoingStubbing; + +import java.util.HashMap; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyFloat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class CachedKafkaMetricsTest { + + @Mock + private FiniteMetricRecorder finiteMetricRecorder; + + @Test + public void cachedKafkaVersion() { + KafkaMetric versionKafkaMetric = createKafkaMetric(KafkaMetricType.VERSION); + + CachedKafkaMetric cachedKafkaMetric = CachedKafkaMetrics.newCachedKafkaMetric(versionKafkaMetric); + + assertThat(cachedKafkaMetric.getClass().getName(), + equalTo("com.nr.instrumentation.kafka.CachedKafkaMetrics$CachedKafkaVersion")); + assertThat(cachedKafkaMetric.isValid(), is(true)); + assertThat(cachedKafkaMetric.displayName(), + equalTo("app-info/version/42")); + + cachedKafkaMetric.report(finiteMetricRecorder); + verify(finiteMetricRecorder).recordMetric(eq("MessageBroker/Kafka/Internal/app-info/version/42"), eq(1.0f)); + } + + @Test + public void invalidCachedKafkaMetric() { + KafkaMetric invalidKafkaMetric = createKafkaMetric(KafkaMetricType.INVALID); + + CachedKafkaMetric cachedKafkaMetric = CachedKafkaMetrics.newCachedKafkaMetric(invalidKafkaMetric); + + assertThat(cachedKafkaMetric.getClass().getName(), + equalTo("com.nr.instrumentation.kafka.CachedKafkaMetrics$InvalidCachedKafkaMetric")); + assertThat(cachedKafkaMetric.isValid(), is(false)); + assertThat(cachedKafkaMetric.displayName(), + equalTo("data/invalid {}")); + + cachedKafkaMetric.report(finiteMetricRecorder); + verifyNoInteractions(finiteMetricRecorder); + } + + @Test + public void cachedKafkaSummary() { + KafkaMetric summaryKafkaMetric = createKafkaMetric(KafkaMetricType.SUMMARY); + + CachedKafkaMetric cachedKafkaMetric = CachedKafkaMetrics.newCachedKafkaMetric(summaryKafkaMetric); + + assertThat(cachedKafkaMetric.getClass().getName(), + equalTo("com.nr.instrumentation.kafka.CachedKafkaMetrics$CachedKafkaSummary")); + assertThat(cachedKafkaMetric.isValid(), is(true)); + assertThat(cachedKafkaMetric.displayName(), + equalTo("data/summary {}")); + + cachedKafkaMetric.report(finiteMetricRecorder); + verify(finiteMetricRecorder).recordMetric(eq("MessageBroker/Kafka/Internal/data/summary"), eq(2.0f)); + } + + @Test + public void cachedKafkaCounter() { + KafkaMetric counterKafkaMetric = createKafkaMetric(KafkaMetricType.COUNTER); + + CachedKafkaMetric cachedKafkaMetric = CachedKafkaMetrics.newCachedKafkaMetric(counterKafkaMetric); + + assertThat(cachedKafkaMetric.getClass().getName(), + equalTo("com.nr.instrumentation.kafka.CachedKafkaMetrics$CachedKafkaCounter")); + assertThat(cachedKafkaMetric.isValid(), is(true)); + assertThat(cachedKafkaMetric.displayName(), + equalTo("data/something {}")); + + when(finiteMetricRecorder.tryRecordMetric(any(), anyFloat())) + .thenReturn(true); + + cachedKafkaMetric.report(finiteMetricRecorder); + verify(finiteMetricRecorder).tryRecordMetric(eq("MessageBroker/Kafka/Internal/data/something"), eq(3.0f)); + verifyNoMoreInteractions(finiteMetricRecorder); + + cachedKafkaMetric.report(finiteMetricRecorder); + verify(finiteMetricRecorder).tryRecordMetric(eq("MessageBroker/Kafka/Internal/data/something"), eq(4.0f)); + verify(finiteMetricRecorder).incrementCounter(eq("MessageBroker/Kafka/Internal/data/something-counter"), eq(1)); + } + + @Test + public void cachedKafkaCounterTotal() { + KafkaMetric counterKafkaMetric = createKafkaMetric(KafkaMetricType.COUNTER_TOTAL); + + CachedKafkaMetric cachedKafkaMetric = CachedKafkaMetrics.newCachedKafkaMetric(counterKafkaMetric); + + assertThat(cachedKafkaMetric.getClass().getName(), + equalTo("com.nr.instrumentation.kafka.CachedKafkaMetrics$CachedKafkaCounter")); + assertThat(cachedKafkaMetric.isValid(), is(true)); + assertThat(cachedKafkaMetric.displayName(), + equalTo("data/something-total {}")); + + when(finiteMetricRecorder.tryRecordMetric(any(), anyFloat())) + .thenReturn(true); + + cachedKafkaMetric.report(finiteMetricRecorder); + verify(finiteMetricRecorder).tryRecordMetric(eq("MessageBroker/Kafka/Internal/data/something-total"), eq(4.0f)); + verifyNoMoreInteractions(finiteMetricRecorder); + + cachedKafkaMetric.report(finiteMetricRecorder); + verify(finiteMetricRecorder).tryRecordMetric(eq("MessageBroker/Kafka/Internal/data/something-total"), eq(5.0f)); + verify(finiteMetricRecorder).incrementCounter(eq("MessageBroker/Kafka/Internal/data/something-counter"), eq(1)); + } + + @Test + public void cachedKafkaCounterTotalCantTrustValue() { + KafkaMetric counterKafkaMetric = createKafkaMetric(KafkaMetricType.COUNTER_TOTAL); + + CachedKafkaMetric cachedKafkaMetric = CachedKafkaMetrics.newCachedKafkaMetric(counterKafkaMetric); + + assertThat(cachedKafkaMetric.getClass().getName(), + equalTo("com.nr.instrumentation.kafka.CachedKafkaMetrics$CachedKafkaCounter")); + assertThat(cachedKafkaMetric.isValid(), is(true)); + assertThat(cachedKafkaMetric.displayName(), + equalTo("data/something-total {}")); + + // when this method returns false, it means that the value was not recorded + // and thus, the increaseCount will not be called. + when(finiteMetricRecorder.tryRecordMetric(any(), anyFloat())) + .thenReturn(false); + + cachedKafkaMetric.report(finiteMetricRecorder); + verify(finiteMetricRecorder).tryRecordMetric(eq("MessageBroker/Kafka/Internal/data/something-total"), eq(4.0f)); + verifyNoMoreInteractions(finiteMetricRecorder); + + cachedKafkaMetric.report(finiteMetricRecorder); + verify(finiteMetricRecorder).tryRecordMetric(eq("MessageBroker/Kafka/Internal/data/something-total"), eq(5.0f)); + verifyNoMoreInteractions(finiteMetricRecorder); + } + + + @Test + public void cachedKafkaWithoutMeasurable() { + KafkaMetric counterKafkaMetric = createKafkaMetric(KafkaMetricType.WITHOUT_MEASURABLE); + + CachedKafkaMetric cachedKafkaMetric = CachedKafkaMetrics.newCachedKafkaMetric(counterKafkaMetric); + + assertThat(cachedKafkaMetric.getClass().getName(), + equalTo("com.nr.instrumentation.kafka.CachedKafkaMetrics$CachedKafkaSummary")); + assertThat(cachedKafkaMetric.isValid(), is(true)); + assertThat(cachedKafkaMetric.displayName(), + equalTo("data/unmeasurable {}")); + + cachedKafkaMetric.report(finiteMetricRecorder); + verify(finiteMetricRecorder).recordMetric(eq("MessageBroker/Kafka/Internal/data/unmeasurable"), eq(6.0f)); + } + + private KafkaMetric createKafkaMetric(KafkaMetricType metricType) { + KafkaMetric kafkaMetric = mock(KafkaMetric.class, Mockito.RETURNS_DEEP_STUBS); + when(kafkaMetric.metricName().name()) + .thenReturn(metricType.metricName); + when(kafkaMetric.metricName().group()) + .thenReturn(metricType.metricGroup); + + OngoingStubbing valuesStubbing = when(kafkaMetric.metricValue()); + for (Object value : metricType.values) { + valuesStubbing = valuesStubbing.thenReturn(value); + } + + when(kafkaMetric.measurable()) + .thenReturn(metricType.measurable); + when(kafkaMetric.metricName().tags()) + .thenReturn(new HashMap<>()); + return kafkaMetric; + } + + /** + * These are the scenarios being tested and respective values. + */ + private enum KafkaMetricType { + VERSION("app-info", "version", new Value(), 42), + INVALID("data", "invalid", new Max(), "towel"), + SUMMARY("data", "summary", new Avg(), 2.0f), + COUNTER("data", "something", new CumulativeSum(), 3, 4), + COUNTER_TOTAL("data", "something-total", new CumulativeSum(), 4, 5), + WITHOUT_MEASURABLE("data", "unmeasurable", null, 6), + ; + + KafkaMetricType(String metricGroup, String metricName, Measurable measurable, Object... values) { + this.metricGroup = metricGroup; + this.metricName = metricName; + this.values = values; + this.measurable = measurable; + } + + private final String metricGroup; + private final String metricName; + private final Object[] values; + private final Measurable measurable; + } +} \ No newline at end of file diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/FiniteMetricRecorderTest.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/FiniteMetricRecorderTest.java new file mode 100644 index 0000000000..78fc8fc43d --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/FiniteMetricRecorderTest.java @@ -0,0 +1,80 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import com.newrelic.api.agent.NewRelic; +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; + +public class FiniteMetricRecorderTest { + + private FiniteMetricRecorder recorder; + private static final String METRIC = "metric"; + private static final float VALUE = 42.0f; + private static final int COUNT = 11; + + + @Before + public void setUp() { + recorder = new FiniteMetricRecorder(); + } + + @Test + public void incrementCounter() { + try (MockedStatic newRelic = mockStatic(NewRelic.class)) { + recorder.incrementCounter(METRIC, COUNT); + newRelic.verify(() -> NewRelic.incrementCounter(eq(METRIC), eq(COUNT))); + } + } + + @Test + public void tryRecordMetric() { + try (MockedStatic newRelic = mockStatic(NewRelic.class)) { + boolean returnValue = recorder.tryRecordMetric(METRIC, VALUE); + + assertThat(returnValue, is(true)); + newRelic.verify(() -> NewRelic.recordMetric(eq(METRIC), eq(VALUE))); + } + } + + @Test + public void tryRecordInfiniteMetric() { + try (MockedStatic newRelic = mockStatic(NewRelic.class)) { + boolean returnValue = recorder.tryRecordMetric(METRIC, Float.POSITIVE_INFINITY); + + assertThat(returnValue, is(false)); + newRelic.verifyNoInteractions(); + } + } + + @Test + public void recordMetric() { + try (MockedStatic newRelic = mockStatic(NewRelic.class)) { + recorder.recordMetric(METRIC, VALUE); + newRelic.verify(() -> NewRelic.recordMetric(eq(METRIC), eq(VALUE))); + } + } + + @Test + public void recordInfiniteMetric() { + try (MockedStatic newRelic = mockStatic(NewRelic.class)) { + recorder.tryRecordMetric(METRIC, Float.POSITIVE_INFINITY); + newRelic.verifyNoInteractions(); + } + } + +} \ No newline at end of file diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/MetricNameUtilTest.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/MetricNameUtilTest.java new file mode 100644 index 0000000000..6d9e1a0d9a --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/MetricNameUtilTest.java @@ -0,0 +1,187 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import org.apache.kafka.common.metrics.KafkaMetric; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.*; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class MetricNameUtilTest { + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private KafkaMetric kafkaMetric; + + @Test + public void buildDisplayName() { + setupKafkaMetric(); + String actual = MetricNameUtil.buildDisplayName(kafkaMetric); + assertThat(actual, equalTo("group/name {}")); + } + + // not testing with more than one tag because iteration order in a Hashmap is not guaranteed. + @Test + public void buildDisplayName_withTag() { + setupKafkaMetric(Tag.TOPIC); + String actual = MetricNameUtil.buildDisplayName(kafkaMetric); + assertThat(actual, equalTo("group/name {topic=t}")); + } + + @Test + public void buildMetricName() { + setupKafkaMetric(); + String actual = MetricNameUtil.buildMetricName(kafkaMetric); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/name")); + } + + @Test + public void buildMetricName_withAllTags() { + setupKafkaMetric(Tag.CLIENT_ID, Tag.TOPIC, Tag.NODE_ID); + String actual = MetricNameUtil.buildMetricName(kafkaMetric); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/topic/t/client/ci/name")); + } + + @Test + public void buildMetricName_withClientId() { + setupKafkaMetric(Tag.CLIENT_ID); + String actual = MetricNameUtil.buildMetricName(kafkaMetric); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/client/ci/name")); + } + + @Test + public void buildMetricName_withTopic() { + setupKafkaMetric(Tag.TOPIC); + String actual = MetricNameUtil.buildMetricName(kafkaMetric); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/name")); + } + + @Test + public void buildMetricName_withNodeId() { + setupKafkaMetric(Tag.NODE_ID); + String actual = MetricNameUtil.buildMetricName(kafkaMetric); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/name")); + } + + @Test + public void buildMetricName_withClientIdTopic() { + setupKafkaMetric(Tag.CLIENT_ID, Tag.TOPIC); + String actual = MetricNameUtil.buildMetricName(kafkaMetric); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/topic/t/client/ci/name")); + } + + @Test + public void buildMetricName_withClientIdNodeId() { + setupKafkaMetric(Tag.CLIENT_ID, Tag.NODE_ID); + String actual = MetricNameUtil.buildMetricName(kafkaMetric); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/node/ni/client/ci/name")); + } + + @Test + public void buildMetricName_withTopicNodeId() { + setupKafkaMetric(Tag.TOPIC, Tag.NODE_ID); + String actual = MetricNameUtil.buildMetricName(kafkaMetric); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/name")); + } + + @Test + public void buildMetricName_nameOverride() { + setupKafkaMetric(); + String actual = MetricNameUtil.buildMetricName(kafkaMetric, "diffName"); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/diffName")); + } + + @Test + public void buildMetricName_nameOverride_withAllTags() { + setupKafkaMetric(Tag.CLIENT_ID, Tag.TOPIC, Tag.NODE_ID); + String actual = MetricNameUtil.buildMetricName(kafkaMetric, "diffName"); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/topic/t/client/ci/diffName")); + } + + @Test + public void buildMetricName_nameOverride_withClientId() { + setupKafkaMetric(Tag.CLIENT_ID); + String actual = MetricNameUtil.buildMetricName(kafkaMetric, "diffName"); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/client/ci/diffName")); + } + + @Test + public void buildMetricName_nameOverride_withTopic() { + setupKafkaMetric(Tag.TOPIC); + String actual = MetricNameUtil.buildMetricName(kafkaMetric, "diffName"); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/diffName")); + } + + @Test + public void buildMetricName_nameOverride_withNodeId() { + setupKafkaMetric(Tag.NODE_ID); + String actual = MetricNameUtil.buildMetricName(kafkaMetric, "diffName"); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/diffName")); + } + + @Test + public void buildMetricName_nameOverride_withClientIdTopic() { + setupKafkaMetric(Tag.CLIENT_ID, Tag.TOPIC); + String actual = MetricNameUtil.buildMetricName(kafkaMetric, "diffName"); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/topic/t/client/ci/diffName")); + } + + @Test + public void buildMetricName_nameOverride_withClientIdNodeId() { + setupKafkaMetric(Tag.CLIENT_ID, Tag.NODE_ID); + String actual = MetricNameUtil.buildMetricName(kafkaMetric, "diffName"); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/node/ni/client/ci/diffName")); + } + + @Test + public void buildMetricName_nameOverride_withTopicNodeId() { + setupKafkaMetric(Tag.TOPIC, Tag.NODE_ID); + String actual = MetricNameUtil.buildMetricName(kafkaMetric, "diffName"); + assertThat(actual, equalTo("MessageBroker/Kafka/Internal/group/diffName")); + } + + private void setupKafkaMetric(Tag... tags) { + reset(kafkaMetric); + when(kafkaMetric.metricName().group()) + .thenReturn("group"); + when(kafkaMetric.metricName().name()) + .thenReturn("name"); + + Map tagMap = new HashMap<>(); + for (Tag tag : tags) { + tagMap.put(tag.label, tag.value); + } + when(kafkaMetric.metricName().tags()) + .thenReturn(tagMap); + } + + private enum Tag { + CLIENT_ID("client-id", "ci"), + NODE_ID("node-id", "ni"), + TOPIC("topic", "t"), + ; + private final String label; + private final String value; + + Tag(String label, String value) { + this.label = label; + this.value = value; + } + } +} \ No newline at end of file diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/NewRelicMetricsReporterTest.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/NewRelicMetricsReporterTest.java new file mode 100644 index 0000000000..e157b13f2a --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/NewRelicMetricsReporterTest.java @@ -0,0 +1,169 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import com.newrelic.agent.introspec.InstrumentationTestConfig; +import com.newrelic.agent.introspec.InstrumentationTestRunner; +import com.newrelic.agent.introspec.Introspector; +import com.newrelic.agent.introspec.TracedMetricData; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * These are not unit tests but a functionality test for the whole class. + * Unit testing the methods would require a lot of reflection to check that it worked. + * It uses the InstrumentationTestRunner even though it is not testing any weave class + * so the introspector processes the calls to NewRelic. This prevents static mocking of + * NewRelic, which is extra complicated because static mocking is thread based, and there + * are other threads in the code being tested. + */ +@RunWith(InstrumentationTestRunner.class) +@InstrumentationTestConfig(includePrefixes = "org.apache.kafka") +public class NewRelicMetricsReporterTest { + private Introspector introspector; + private static final KafkaMetric METRIC1 = getMetricMock("metric1", 42.0f); + private static final KafkaMetric METRIC2 = getMetricMock("metric2", 11.0f); + + @Before + public void setup() { + introspector = InstrumentationTestRunner.getIntrospector(); + } + + @Test + public void initialLoad() throws InterruptedException { + List initialMetrics = Arrays.asList(METRIC1, METRIC2); + + NewRelicMetricsReporter reporter = initMetricsReporter(initialMetrics, Collections.emptyList()); + + Map unscopedMetrics = introspector.getUnscopedMetrics(); + TracedMetricData metric1 = unscopedMetrics.get("MessageBroker/Kafka/Internal/group/metric1"); + assertEquals(42.0f, metric1.getTotalTimeInSec(), 0.1f); + TracedMetricData metric2 = unscopedMetrics.get("MessageBroker/Kafka/Internal/group/metric2"); + assertEquals(11.0f, metric2.getTotalTimeInSec(), 0.1f); + + reporter.close(); + } + + @Test + public void laterLoad() throws Exception { + List otherMetrics = Arrays.asList(METRIC1, METRIC2); + + NewRelicMetricsReporter reporter = initMetricsReporter(Collections.emptyList(), otherMetrics); + + Map unscopedMetrics = introspector.getUnscopedMetrics(); + assertEquals(1, unscopedMetrics.size()); + TracedMetricData nodeMetric = unscopedMetrics.get("MessageBroker/Kafka/Nodes/localhost:42"); + assertEquals(1.0f, nodeMetric.getTotalTimeInSec(), 0.1f); + + forceHarvest(reporter); + + unscopedMetrics = introspector.getUnscopedMetrics(); + TracedMetricData metric1 = unscopedMetrics.get("MessageBroker/Kafka/Internal/group/metric1"); + assertEquals(42.0f, metric1.getTotalTimeInSec(), 0.1f); + TracedMetricData metric2 = unscopedMetrics.get("MessageBroker/Kafka/Internal/group/metric2"); + assertEquals(11.0f, metric2.getTotalTimeInSec(), 0.1f); + + reporter.close(); + } + + @Test + public void removeMetric() throws Exception { + List initialMetrics = Arrays.asList(METRIC1, METRIC2); + + NewRelicMetricsReporter reporter = initMetricsReporter(initialMetrics, Collections.emptyList()); + + Map unscopedMetrics = introspector.getUnscopedMetrics(); + assertEquals(3, unscopedMetrics.size()); // metric1, metric2 and node metric + + introspector.clear(); + reporter.metricRemoval(METRIC2); + forceHarvest(reporter); + + unscopedMetrics = introspector.getUnscopedMetrics(); + assertEquals(2, unscopedMetrics.size()); + TracedMetricData metric1 = unscopedMetrics.get("MessageBroker/Kafka/Internal/group/metric1"); + assertEquals(42.0f, metric1.getTotalTimeInSec(), 0.1f); + TracedMetricData nodeMetric = unscopedMetrics.get("MessageBroker/Kafka/Nodes/localhost:42"); + assertEquals(1.0f, nodeMetric.getTotalTimeInSec(), 0.1f); + + reporter.close(); + } + + @Test + public void nodeTopicMetrics() throws InterruptedException{ + KafkaMetric metricWithTopic = getMetricMock("topicMetric", 20.0f); + when(metricWithTopic.metricName().tags().get("topic")).thenReturn("hhgg"); + List initialMetrics = Arrays.asList(metricWithTopic); + + NewRelicMetricsReporter reporter = initMetricsReporter(initialMetrics, Collections.emptyList()); + + Map unscopedMetrics = introspector.getUnscopedMetrics(); + assertEquals(3, unscopedMetrics.size()); + + TracedMetricData metric1 = unscopedMetrics.get("MessageBroker/Kafka/Internal/group/topicMetric"); + assertEquals(20.0f, metric1.getTotalTimeInSec(), 0.1f); + TracedMetricData nodeMetric = unscopedMetrics.get("MessageBroker/Kafka/Nodes/localhost:42"); + assertEquals(1.0f, nodeMetric.getTotalTimeInSec(), 0.1f); + TracedMetricData nodeTopicMetric = unscopedMetrics.get("MessageBroker/Kafka/Nodes/localhost:42/Consume/hhgg"); + assertEquals(1.0f, nodeTopicMetric.getTotalTimeInSec(), 0.1f); + + reporter.close(); + } + + protected static NewRelicMetricsReporter initMetricsReporter(List initMetrics, Collection otherMetrics) throws InterruptedException { + Node node = mock(Node.class); + when(node.host()) + .thenReturn("localhost"); + when(node.port()) + .thenReturn(42); + NewRelicMetricsReporter metricsReporter = new NewRelicMetricsReporter(ClientType.CONSUMER, Collections.singleton(node)); + metricsReporter.init(initMetrics); + // init triggers the first harvest that happens in a different thread. Sleeping to let it finish. + Thread.sleep(100L); + + for (KafkaMetric otherMetric : otherMetrics) { + metricsReporter.metricChange(otherMetric); + } + return metricsReporter; + } + + protected static KafkaMetric getMetricMock(String name, Object value) { + KafkaMetric metric = mock(KafkaMetric.class, RETURNS_DEEP_STUBS); + when(metric.metricName().group()) + .thenReturn("group"); + when(metric.metricName().name()) + .thenReturn(name); + when(metric.metricValue()) + .thenReturn(value); + return metric; + } + + + private void forceHarvest(NewRelicMetricsReporter reporter) throws Exception { + Method report = NewRelicMetricsReporter.class.getDeclaredMethod("report"); + if (!report.isAccessible()) { + report.setAccessible(true); + } + report.invoke(reporter); + } +} \ No newline at end of file diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/NodeTopicRegistryTest.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/NodeTopicRegistryTest.java new file mode 100644 index 0000000000..d0f39cf9f4 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/NodeTopicRegistryTest.java @@ -0,0 +1,99 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import org.apache.kafka.common.Node; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class NodeTopicRegistryTest { + + @Test + public void singleNodeTest() { + NodeTopicRegistry nodeTopicRegistry = new NodeTopicRegistry(ClientType.CONSUMER, Collections.singleton(getNode("ad"))); + assertTrue(nodeTopicRegistry.register("tm")); + assertFalse(nodeTopicRegistry.register("tm")); + assertTrue(nodeTopicRegistry.register("fp")); + assertTrue(nodeTopicRegistry.register("zb")); + + FiniteMetricRecorder finiteMetricRecorder = mock(FiniteMetricRecorder.class); + nodeTopicRegistry.report(finiteMetricRecorder); + + verifyMetrics(finiteMetricRecorder, + "MessageBroker/Kafka/Nodes/ad:42/Consume/tm", + "MessageBroker/Kafka/Nodes/ad:42/Consume/fp", + "MessageBroker/Kafka/Nodes/ad:42/Consume/zb", + "MessageBroker/Kafka/Nodes/ad:42" + ); + + + // verify nothing is reported after close + clearInvocations(finiteMetricRecorder); + + nodeTopicRegistry.close(); + nodeTopicRegistry.report(finiteMetricRecorder); + + verifyNoInteractions(finiteMetricRecorder); + } + + @Test + public void multiNodeTest() { + NodeTopicRegistry nodeTopicRegistry = new NodeTopicRegistry(ClientType.PRODUCER, Arrays.asList(getNode("hh"), getNode("gg"))); + assertTrue(nodeTopicRegistry.register("vp")); + assertFalse(nodeTopicRegistry.register("vp")); + assertTrue(nodeTopicRegistry.register("sep")); + + FiniteMetricRecorder finiteMetricRecorder = mock(FiniteMetricRecorder.class); + nodeTopicRegistry.report(finiteMetricRecorder); + + verifyMetrics(finiteMetricRecorder, + "MessageBroker/Kafka/Nodes/hh:42/Produce/vp", + "MessageBroker/Kafka/Nodes/hh:42/Produce/sep", + "MessageBroker/Kafka/Nodes/hh:42", + "MessageBroker/Kafka/Nodes/gg:42/Produce/vp", + "MessageBroker/Kafka/Nodes/gg:42/Produce/sep", + "MessageBroker/Kafka/Nodes/gg:42" + ); + + + // verify nothing is reported after close + clearInvocations(finiteMetricRecorder); + + nodeTopicRegistry.close(); + nodeTopicRegistry.report(finiteMetricRecorder); + + verifyNoInteractions(finiteMetricRecorder); + } + + private Node getNode(String host) { + Node node = Mockito.mock(Node.class); + when(node.host()).thenReturn(host); + when(node.port()).thenReturn(42); + return node; + } + + private void verifyMetrics(FiniteMetricRecorder metricRecorderMock, String ... metrics) { + for (String metric : metrics) { + verify(metricRecorderMock).recordMetric(eq(metric), eq(1.0f)); + } + verifyNoMoreInteractions(metricRecorderMock); + } +} \ No newline at end of file diff --git a/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/ThreadFactoriesTest.java b/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/ThreadFactoriesTest.java new file mode 100644 index 0000000000..ff7865049e --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-2.3.0/src/test/java/com/nr/instrumentation/kafka/ThreadFactoriesTest.java @@ -0,0 +1,31 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import org.junit.Test; + +import java.util.concurrent.ThreadFactory; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +public class ThreadFactoriesTest { + + @Test + public void build() { + ThreadFactory threadFactory = ThreadFactories.build("TestService"); + Thread thread1 = threadFactory.newThread(() -> {}); + Thread thread2 = threadFactory.newThread(() -> {}); + + assertThat(thread1.getName(), equalTo("New Relic TestService #1")); + assertThat(thread2.getName(), equalTo("New Relic TestService #2")); + assertThat(thread1.isDaemon(), is(true)); + assertThat(thread2.isDaemon(), is(true)); + } +} \ No newline at end of file diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/build.gradle b/instrumentation/kafka-clients-node-metrics-3.7.0/build.gradle index 5a4e6ef29d..2107fea2ee 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/build.gradle +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/build.gradle @@ -14,7 +14,7 @@ jar { } verifyInstrumentation { - passesOnly 'org.apache.kafka:kafka-clients:[3.7.0,)' + passesOnly 'org.apache.kafka:kafka-clients:[3.7.0,3.9.0)' } diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetric.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetric.java index dbe471ab64..083222108e 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetric.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetric.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; public interface CachedKafkaMetric { diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetrics.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetrics.java index 3aa651b195..90f317583e 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetrics.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CachedKafkaMetrics.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import java.util.regex.Pattern; diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/ClientType.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/ClientType.java new file mode 100644 index 0000000000..d93d534379 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/ClientType.java @@ -0,0 +1,23 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +public enum ClientType { + CONSUMER("Consume"), + PRODUCER("Produce"); + + private final String operation; + + ClientType(String operation) { + this.operation = operation; + } + + public String getOperation() { + return operation; + } +} diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CumulativeSumSupport.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CumulativeSumSupport.java index 7dc226c077..885379ee26 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CumulativeSumSupport.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CumulativeSumSupport.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; public class CumulativeSumSupport { diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/FiniteMetricRecorder.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/FiniteMetricRecorder.java index 97af0dec88..6c4db810cb 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/FiniteMetricRecorder.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/FiniteMetricRecorder.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import com.newrelic.api.agent.NewRelic; diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/MetricNameUtil.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/MetricNameUtil.java index 64c8196edc..8fe91a7341 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/MetricNameUtil.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/MetricNameUtil.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import org.apache.kafka.common.metrics.KafkaMetric; diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java index 22680a9c18..3b41176f13 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java @@ -1,6 +1,15 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import com.newrelic.api.agent.NewRelic; + +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -10,6 +19,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricsReporter; @@ -25,6 +35,11 @@ public class NewRelicMetricsReporter implements MetricsReporter { private final ConcurrentHashMap metrics = new ConcurrentHashMap<>(); private final FiniteMetricRecorder recorder = new FiniteMetricRecorder(); + private final NodeTopicRegistry nodeTopicRegistry; + + public NewRelicMetricsReporter(ClientType clientType, Collection nodes) { + nodeTopicRegistry = new NodeTopicRegistry(clientType, nodes); + } @Override public void init(List metrics) { @@ -36,12 +51,7 @@ public void init(List metrics) { registerMetric(metric); } - future = SCHEDULER.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - report(); - } - }, 0, REPORTING_INTERVAL_IN_SECONDS, TimeUnit.SECONDS); + future = SCHEDULER.scheduleAtFixedRate(this::report, 0, REPORTING_INTERVAL_IN_SECONDS, TimeUnit.SECONDS); } @Override @@ -51,8 +61,6 @@ public void metricChange(KafkaMetric metric) { @Override public void metricRemoval(KafkaMetric metric) { - metrics.remove(metric.metricName()); - final CachedKafkaMetric cachedMetric = metrics.remove(metric.metricName()); if (cachedMetric != null) { debugLog("newrelic-kafka-clients-enhancements: deregister metric: {0}", cachedMetric.displayName()); @@ -65,6 +73,8 @@ public void close() { future.cancel(false); future = null; } + metrics.clear(); + nodeTopicRegistry.close(); } @Override @@ -79,12 +89,17 @@ private void registerMetric(final KafkaMetric metric) { return; } - if (TOPIC_METRICS_DISABLED && metric.metricName().tags().get("topic") != null) { + String topic = metric.metricName().tags().get("topic"); + if (TOPIC_METRICS_DISABLED && topic != null) { debugLog("newrelic-kafka-clients-enhancements: skipping topic metric registration: {0}", MetricNameUtil.buildDisplayName(metric)); return; } + if (nodeTopicRegistry.register(topic)) { + debugLog("newrelic-kafka-clients-enhancements: register node topic metric for topic: {0}", topic); + } + final CachedKafkaMetric cachedMetric = CachedKafkaMetrics.newCachedKafkaMetric(metric); if (cachedMetric.isValid()) { debugLog("newrelic-kafka-clients-enhancements: register metric: {0}", cachedMetric.displayName()); @@ -101,6 +116,8 @@ private void report() { for (final CachedKafkaMetric metric : metrics.values()) { metric.report(recorder); } + + nodeTopicRegistry.report(recorder); } private void debugLog(String message) { diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/NodeTopicRegistry.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/NodeTopicRegistry.java new file mode 100644 index 0000000000..514073adbc --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/NodeTopicRegistry.java @@ -0,0 +1,84 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import org.apache.kafka.common.Node; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class is used to track all the metric names that are related to a specific node: + * + * - MessageBroker/Kafka/Nodes/host:port + * - MessageBroker/Kafka/Nodes/host:port/Consume/topicName + * - MessageBroker/Kafka/Nodes/host:port/Produce/topicName + * + * At initialization time we only have the node and the mode (is this a metrics reporter + * for a Kafka consumer or for a Kafka producer?). + * + * Then, as topics are discovered through the metricChange method, the topic metric names are + * generated. This is the best way we have to get track of the topics since they're not + * available when the KafkaConsumer/KafkaProducer is initialized. + * + * For KafkaConsumer, the SubscriptionState doesn't contain the topics and partitions + * at initialization time because it takes time for the rebalance to happen. + * + * For KafkaProducer, topics are dynamic since a producer could send records to any + * topic and the concept of subscription doesn't exist there. + * + * Alternatively we could get the topics from the records in KafkaProducer.doSend or + * KafkaConsumer.poll, and call NewRelicMetricsReporter.addTopicToNodeMetrics from there. + * This approach would have a small impact in performance, and getting the topics from the + * KafkaMetrics is a good enough solution. + */ +public class NodeTopicRegistry { + private final Set recordedTopics = ConcurrentHashMap.newKeySet(); + private final Set metricNames = ConcurrentHashMap.newKeySet(); + private final Set nodes = new HashSet<>(); + private final ClientType clientType; + + private static final String METRIC_PREFIX = "MessageBroker/Kafka/Nodes/"; + + public NodeTopicRegistry(ClientType clientType, Collection nodes) { + this.clientType = clientType; + for (Node node : nodes) { + String nodeName = node.host() + ":" + node.port(); + this.nodes.add(nodeName); + this.metricNames.add(METRIC_PREFIX + nodeName); + } + } + + /** + * @return true if the topic was registered + */ + public boolean register(String topic) { + if (topic != null && recordedTopics.add(topic)) { + for (String node : nodes) { + String metricName = METRIC_PREFIX + node + "/" + clientType.getOperation() + "/" + topic; + this.metricNames.add(metricName); + } + return true; + } + return false; + } + + public void report(FiniteMetricRecorder recorder) { + for (String topicMetric : metricNames) { + recorder.recordMetric(topicMetric, 1.0f); + } + } + + public void close() { + recordedTopics.clear(); + metricNames.clear(); + nodes.clear(); + } +} diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/ThreadFactories.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/ThreadFactories.java index 857c8fd807..853b985c01 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/ThreadFactories.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/ThreadFactories.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import java.util.concurrent.ThreadFactory; diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/consumer/internals/AsyncKafkaConsumer_Instrumentation.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/consumer/internals/AsyncKafkaConsumer_Instrumentation.java new file mode 100644 index 0000000000..aa9bc1930a --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/consumer/internals/AsyncKafkaConsumer_Instrumentation.java @@ -0,0 +1,51 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.apache.kafka.kafka.clients.consumer.internals; + +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.weaver.NewField; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.WeaveAllConstructors; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.ClientType; +import com.nr.instrumentation.kafka.NewRelicMetricsReporter; +import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metrics.Metrics; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; + +@Weave(originalName = "org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer") +public abstract class AsyncKafkaConsumer_Instrumentation { + + private final Metrics metrics = Weaver.callOriginal(); + private final String clientId = Weaver.callOriginal(); + private final ConsumerMetadata metadata = Weaver.callOriginal(); + // It's possible for constructors to be invoked multiple times (e.g. `C() { C("some default") }` ). + // When this happens we don't want to register the metrics reporter multiple times. + @NewField + private boolean metricsReporterInstalled; + + @WeaveAllConstructors + public AsyncKafkaConsumer_Instrumentation() { + if (!metricsReporterInstalled) { + NewRelic.getAgent().getLogger().log(Level.INFO, + "newrelic-kafka-clients-enhancements engaged for consumer {0}", clientId); + metrics.addReporter(new NewRelicMetricsReporter(ClientType.CONSUMER, metadata.fetch().nodes())); + metricsReporterInstalled = true; + } + } +} diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/consumer/internals/LegacyKafkaConsumer_Instrumentation.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/consumer/internals/LegacyKafkaConsumer_Instrumentation.java new file mode 100644 index 0000000000..1fb0481191 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/consumer/internals/LegacyKafkaConsumer_Instrumentation.java @@ -0,0 +1,51 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.apache.kafka.kafka.clients.consumer.internals; + +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.weaver.NewField; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.WeaveAllConstructors; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.ClientType; +import com.nr.instrumentation.kafka.NewRelicMetricsReporter; +import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metrics.Metrics; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; + +@Weave(originalName = "org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer") +public abstract class LegacyKafkaConsumer_Instrumentation { + + private final Metrics metrics = Weaver.callOriginal(); + private final String clientId = Weaver.callOriginal(); + private final ConsumerMetadata metadata = Weaver.callOriginal(); + // It's possible for constructors to be invoked multiple times (e.g. `C() { C("some default") }` ). + // When this happens we don't want to register the metrics reporter multiple times. + @NewField + private boolean metricsReporterInstalled; + + @WeaveAllConstructors + public LegacyKafkaConsumer_Instrumentation() { + if (!metricsReporterInstalled) { + NewRelic.getAgent().getLogger().log(Level.INFO, + "newrelic-kafka-clients-enhancements engaged for consumer {0}", clientId); + metrics.addReporter(new NewRelicMetricsReporter(ClientType.CONSUMER, metadata.fetch().nodes())); + metricsReporterInstalled = true; + } + } +} diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/producer/KafkaProducer_Instrumentation.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/producer/KafkaProducer_Instrumentation.java index c5bec36efa..9d8260c646 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/producer/KafkaProducer_Instrumentation.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/main/java/org/apache/kafka/kafka/clients/producer/KafkaProducer_Instrumentation.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package org.apache.kafka.kafka.clients.producer; import com.newrelic.api.agent.NewRelic; @@ -5,8 +12,11 @@ import com.newrelic.api.agent.weaver.Weave; import com.newrelic.api.agent.weaver.WeaveAllConstructors; import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.ClientType; import com.nr.instrumentation.kafka.NewRelicMetricsReporter; import java.util.logging.Level; + +import org.apache.kafka.clients.producer.internals.ProducerMetadata; import org.apache.kafka.common.metrics.Metrics; @Weave(originalName = "org.apache.kafka.clients.producer.KafkaProducer") @@ -14,6 +24,7 @@ public class KafkaProducer_Instrumentation { private final Metrics metrics = Weaver.callOriginal(); private final String clientId = Weaver.callOriginal(); + private final ProducerMetadata metadata = Weaver.callOriginal(); // It's possible for constructors to be invoked multiple times (e.g. `C() { C("some default") }` ). // When this happens we don't want to register the metrics reporter multiple times. @@ -25,7 +36,7 @@ public KafkaProducer_Instrumentation() { if (!metricsReporterInstalled) { NewRelic.getAgent().getLogger().log(Level.INFO, "newrelic-kafka-clients-enhancements engaged for producer {0}", clientId); - metrics.addReporter(new NewRelicMetricsReporter()); + metrics.addReporter(new NewRelicMetricsReporter(ClientType.PRODUCER, metadata.fetch().nodes())); metricsReporterInstalled = true; } } diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/CachedKafkaMetricsTest.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/CachedKafkaMetricsTest.java index 86ed5a5a1c..468630995b 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/CachedKafkaMetricsTest.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/CachedKafkaMetricsTest.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import static org.hamcrest.Matchers.equalTo; diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/FiniteMetricRecorderTest.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/FiniteMetricRecorderTest.java index e9cad0f3bd..b53b3d30a3 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/FiniteMetricRecorderTest.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/FiniteMetricRecorderTest.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import static org.hamcrest.Matchers.is; diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/MetricNameUtilTest.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/MetricNameUtilTest.java index 0c881e57f4..7f9b1d2331 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/MetricNameUtilTest.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/MetricNameUtilTest.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import static org.hamcrest.Matchers.equalTo; diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/NewRelicMetricsReporterTest.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/NewRelicMetricsReporterTest.java index 4089dc1c8e..e157b13f2a 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/NewRelicMetricsReporterTest.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/NewRelicMetricsReporterTest.java @@ -1,24 +1,33 @@ -package com.nr.instrumentation.kafka; +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +package com.nr.instrumentation.kafka; import com.newrelic.agent.introspec.InstrumentationTestConfig; import com.newrelic.agent.introspec.InstrumentationTestRunner; import com.newrelic.agent.introspec.Introspector; import com.newrelic.agent.introspec.TracedMetricData; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + import java.lang.reflect.Method; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.kafka.common.metrics.KafkaMetric; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * These are not unit tests but a functionality test for the whole class. @@ -62,7 +71,9 @@ public void laterLoad() throws Exception { NewRelicMetricsReporter reporter = initMetricsReporter(Collections.emptyList(), otherMetrics); Map unscopedMetrics = introspector.getUnscopedMetrics(); - assertEquals(0, unscopedMetrics.size()); + assertEquals(1, unscopedMetrics.size()); + TracedMetricData nodeMetric = unscopedMetrics.get("MessageBroker/Kafka/Nodes/localhost:42"); + assertEquals(1.0f, nodeMetric.getTotalTimeInSec(), 0.1f); forceHarvest(reporter); @@ -77,27 +88,55 @@ public void laterLoad() throws Exception { @Test public void removeMetric() throws Exception { - List otherMetrics = Arrays.asList(METRIC1, METRIC2); + List initialMetrics = Arrays.asList(METRIC1, METRIC2); - NewRelicMetricsReporter reporter = initMetricsReporter(otherMetrics, Collections.emptyList()); + NewRelicMetricsReporter reporter = initMetricsReporter(initialMetrics, Collections.emptyList()); Map unscopedMetrics = introspector.getUnscopedMetrics(); - assertEquals(2, unscopedMetrics.size()); + assertEquals(3, unscopedMetrics.size()); // metric1, metric2 and node metric introspector.clear(); reporter.metricRemoval(METRIC2); forceHarvest(reporter); unscopedMetrics = introspector.getUnscopedMetrics(); - assertEquals(1, unscopedMetrics.size()); + assertEquals(2, unscopedMetrics.size()); TracedMetricData metric1 = unscopedMetrics.get("MessageBroker/Kafka/Internal/group/metric1"); assertEquals(42.0f, metric1.getTotalTimeInSec(), 0.1f); + TracedMetricData nodeMetric = unscopedMetrics.get("MessageBroker/Kafka/Nodes/localhost:42"); + assertEquals(1.0f, nodeMetric.getTotalTimeInSec(), 0.1f); + + reporter.close(); + } + + @Test + public void nodeTopicMetrics() throws InterruptedException{ + KafkaMetric metricWithTopic = getMetricMock("topicMetric", 20.0f); + when(metricWithTopic.metricName().tags().get("topic")).thenReturn("hhgg"); + List initialMetrics = Arrays.asList(metricWithTopic); + + NewRelicMetricsReporter reporter = initMetricsReporter(initialMetrics, Collections.emptyList()); + + Map unscopedMetrics = introspector.getUnscopedMetrics(); + assertEquals(3, unscopedMetrics.size()); + + TracedMetricData metric1 = unscopedMetrics.get("MessageBroker/Kafka/Internal/group/topicMetric"); + assertEquals(20.0f, metric1.getTotalTimeInSec(), 0.1f); + TracedMetricData nodeMetric = unscopedMetrics.get("MessageBroker/Kafka/Nodes/localhost:42"); + assertEquals(1.0f, nodeMetric.getTotalTimeInSec(), 0.1f); + TracedMetricData nodeTopicMetric = unscopedMetrics.get("MessageBroker/Kafka/Nodes/localhost:42/Consume/hhgg"); + assertEquals(1.0f, nodeTopicMetric.getTotalTimeInSec(), 0.1f); reporter.close(); } protected static NewRelicMetricsReporter initMetricsReporter(List initMetrics, Collection otherMetrics) throws InterruptedException { - NewRelicMetricsReporter metricsReporter = new NewRelicMetricsReporter(); + Node node = mock(Node.class); + when(node.host()) + .thenReturn("localhost"); + when(node.port()) + .thenReturn(42); + NewRelicMetricsReporter metricsReporter = new NewRelicMetricsReporter(ClientType.CONSUMER, Collections.singleton(node)); metricsReporter.init(initMetrics); // init triggers the first harvest that happens in a different thread. Sleeping to let it finish. Thread.sleep(100L); diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/NodeTopicRegistryTest.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/NodeTopicRegistryTest.java new file mode 100644 index 0000000000..d0f39cf9f4 --- /dev/null +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/NodeTopicRegistryTest.java @@ -0,0 +1,99 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import org.apache.kafka.common.Node; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class NodeTopicRegistryTest { + + @Test + public void singleNodeTest() { + NodeTopicRegistry nodeTopicRegistry = new NodeTopicRegistry(ClientType.CONSUMER, Collections.singleton(getNode("ad"))); + assertTrue(nodeTopicRegistry.register("tm")); + assertFalse(nodeTopicRegistry.register("tm")); + assertTrue(nodeTopicRegistry.register("fp")); + assertTrue(nodeTopicRegistry.register("zb")); + + FiniteMetricRecorder finiteMetricRecorder = mock(FiniteMetricRecorder.class); + nodeTopicRegistry.report(finiteMetricRecorder); + + verifyMetrics(finiteMetricRecorder, + "MessageBroker/Kafka/Nodes/ad:42/Consume/tm", + "MessageBroker/Kafka/Nodes/ad:42/Consume/fp", + "MessageBroker/Kafka/Nodes/ad:42/Consume/zb", + "MessageBroker/Kafka/Nodes/ad:42" + ); + + + // verify nothing is reported after close + clearInvocations(finiteMetricRecorder); + + nodeTopicRegistry.close(); + nodeTopicRegistry.report(finiteMetricRecorder); + + verifyNoInteractions(finiteMetricRecorder); + } + + @Test + public void multiNodeTest() { + NodeTopicRegistry nodeTopicRegistry = new NodeTopicRegistry(ClientType.PRODUCER, Arrays.asList(getNode("hh"), getNode("gg"))); + assertTrue(nodeTopicRegistry.register("vp")); + assertFalse(nodeTopicRegistry.register("vp")); + assertTrue(nodeTopicRegistry.register("sep")); + + FiniteMetricRecorder finiteMetricRecorder = mock(FiniteMetricRecorder.class); + nodeTopicRegistry.report(finiteMetricRecorder); + + verifyMetrics(finiteMetricRecorder, + "MessageBroker/Kafka/Nodes/hh:42/Produce/vp", + "MessageBroker/Kafka/Nodes/hh:42/Produce/sep", + "MessageBroker/Kafka/Nodes/hh:42", + "MessageBroker/Kafka/Nodes/gg:42/Produce/vp", + "MessageBroker/Kafka/Nodes/gg:42/Produce/sep", + "MessageBroker/Kafka/Nodes/gg:42" + ); + + + // verify nothing is reported after close + clearInvocations(finiteMetricRecorder); + + nodeTopicRegistry.close(); + nodeTopicRegistry.report(finiteMetricRecorder); + + verifyNoInteractions(finiteMetricRecorder); + } + + private Node getNode(String host) { + Node node = Mockito.mock(Node.class); + when(node.host()).thenReturn(host); + when(node.port()).thenReturn(42); + return node; + } + + private void verifyMetrics(FiniteMetricRecorder metricRecorderMock, String ... metrics) { + for (String metric : metrics) { + verify(metricRecorderMock).recordMetric(eq(metric), eq(1.0f)); + } + verifyNoMoreInteractions(metricRecorderMock); + } +} \ No newline at end of file diff --git a/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/ThreadFactoriesTest.java b/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/ThreadFactoriesTest.java index 7d6cc7015b..9f138d727e 100644 --- a/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/ThreadFactoriesTest.java +++ b/instrumentation/kafka-clients-node-metrics-3.7.0/src/test/java/com/nr/instrumentation/kafka/ThreadFactoriesTest.java @@ -1,3 +1,10 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + package com.nr.instrumentation.kafka; import static org.hamcrest.Matchers.equalTo; diff --git a/settings.gradle b/settings.gradle index e85a1d231a..7f9d62c541 100644 --- a/settings.gradle +++ b/settings.gradle @@ -251,6 +251,7 @@ include 'instrumentation:kafka-clients-metrics-3.0.0' include 'instrumentation:kafka-clients-metrics-3.7.0' include 'instrumentation:kafka-clients-metrics-3.9.0' include 'instrumentation:kafka-clients-node-metrics-1.0.0' +include 'instrumentation:kafka-clients-node-metrics-2.3.0' include 'instrumentation:kafka-clients-node-metrics-3.7.0' include 'instrumentation:kafka-clients-node-metrics-3.9.0' include 'instrumentation:kafka-clients-spans-0.11.0.0'