Skip to content

Commit

Permalink
Kafka clients node metrics (#2202)
Browse files Browse the repository at this point in the history
  • Loading branch information
meiao authored Jan 22, 2025
1 parent d5e2196 commit e5bc12c
Show file tree
Hide file tree
Showing 56 changed files with 2,346 additions and 213 deletions.
4 changes: 2 additions & 2 deletions instrumentation/kafka-clients-node-metrics-1.0.0/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
dependencies {
implementation(project(":newrelic-api"))
implementation(project(":newrelic-weaver-api"))
implementation("org.apache.kafka:kafka-clients:3.3.0")
implementation("org.apache.kafka:kafka-clients:2.2.2")

testImplementation("org.awaitility:awaitility:4.2.0")
testImplementation("org.mockito:mockito-inline:4.11.0")
Expand All @@ -14,7 +14,7 @@ jar {
}

verifyInstrumentation {
passesOnly 'org.apache.kafka:kafka-clients:[1.0.0,3.7.0)'
passesOnly 'org.apache.kafka:kafka-clients:[1.0.0,2.3.0)'
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

interface CachedKafkaMetric {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

import org.apache.kafka.common.metrics.KafkaMetric;
Expand All @@ -12,18 +19,6 @@ static CachedKafkaMetric newCachedKafkaMetric(final KafkaMetric metric) {
return new CachedKafkaVersion(metric);
}

Measurable measurable = null;
try {
measurable = metric.measurable();
} catch (final IllegalStateException e) {
}

final boolean isCumulativeSumType = measurable != null &&
CumulativeSumSupport.isCumulativeSumClass(measurable.getClass().getName());
if (isCumulativeSumType) {
return new CachedKafkaCounter(metric);
}

if (!(metric.metricValue() instanceof Number)) {
return new InvalidCachedKafkaMetric(metric);
}
Expand Down Expand Up @@ -104,60 +99,6 @@ public void report(final FiniteMetricRecorder recorder) {
}
}

private static class CachedKafkaCounter implements CachedKafkaMetric {
private final KafkaMetric metric;
private static final Pattern totalPattern = Pattern.compile("-total$");

private final String counterMetricName;
private final String totalMetricName;

private int previous = -1;

public CachedKafkaCounter(final KafkaMetric metric) {
this.metric = metric;

totalMetricName = MetricNameUtil.buildMetricName(metric);

String metricName = metric.metricName().name();
String counterName = totalPattern.matcher(metricName).replaceAll("-counter");
if (counterName.equals(metricName)) {
counterName = metricName + "-counter";
}
counterMetricName = MetricNameUtil.buildMetricName(metric, counterName);
}

@Override
public boolean isValid() {
return true;
}

@Override
public String displayName() {
return MetricNameUtil.buildDisplayName(metric);
}

@Override
public void report(final FiniteMetricRecorder recorder) {
final Number value = ((Number) metric.metricValue());
if (!recorder.tryRecordMetric(totalMetricName, value.floatValue())) {
// we can't trust the last observed value, so reset
previous = -1;
return;
}

final int intValue = value.intValue();
if (previous == -1L) {
previous = intValue;
return;
}

final int delta = intValue - previous;
previous = intValue;

recorder.incrementCounter(counterMetricName, delta);
}
}

private CachedKafkaMetrics() {
// prevents instantiation
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

public enum ClientType {
CONSUMER("Consume"),
PRODUCER("Produce");

private final String operation;

ClientType(String operation) {
this.operation = operation;
}

public String getOperation() {
return operation;
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

import com.newrelic.api.agent.NewRelic;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

import org.apache.kafka.common.metrics.KafkaMetric;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

import com.newrelic.api.agent.NewRelic;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -26,23 +35,22 @@ public class NewRelicMetricsReporter implements MetricsReporter {

private final ConcurrentHashMap<MetricName, CachedKafkaMetric> metrics = new ConcurrentHashMap<>();
private final FiniteMetricRecorder recorder = new FiniteMetricRecorder();
private final NodeTopicRegistry nodeTopicRegistry;

public NewRelicMetricsReporter(ClientType clientType, Collection<Node> nodes) {
nodeTopicRegistry = new NodeTopicRegistry(clientType, nodes);
}

@Override
public void init(List<KafkaMetric> metrics) {
NewRelic.getAgent().getLogger().log(Level.INFO,
"newrelic-kafka-clients-enhancements: initializing with SUPPORTS_CUMULATIVE_SUM={0}",
CumulativeSumSupport.isCumulativeSumSupported());
"newrelic-kafka-clients-enhancements: initializing. This version of Kafka does not support cumulative sum.");

for (final KafkaMetric metric : metrics) {
registerMetric(metric);
}

future = SCHEDULER.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
report();
}
}, 0, REPORTING_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
future = SCHEDULER.scheduleAtFixedRate(this::report, 0, REPORTING_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
}

@Override
Expand All @@ -52,8 +60,6 @@ public void metricChange(KafkaMetric metric) {

@Override
public void metricRemoval(KafkaMetric metric) {
metrics.remove(metric.metricName());

final CachedKafkaMetric cachedMetric = metrics.remove(metric.metricName());
if (cachedMetric != null) {
debugLog("newrelic-kafka-clients-enhancements: deregister metric: {0}", cachedMetric.displayName());
Expand All @@ -66,6 +72,8 @@ public void close() {
future.cancel(false);
future = null;
}
metrics.clear();
nodeTopicRegistry.close();
}

@Override
Expand All @@ -80,11 +88,15 @@ private void registerMetric(final KafkaMetric metric) {
return;
}

if (TOPIC_METRICS_DISABLED && metric.metricName().tags().get("topic") != null) {
String topic = metric.metricName().tags().get("topic");
if (TOPIC_METRICS_DISABLED && topic != null) {
debugLog("newrelic-kafka-clients-enhancements: skipping topic metric registration: {0}",
MetricNameUtil.buildDisplayName(metric));
return;
}
if (nodeTopicRegistry.register(topic)) {
debugLog("newrelic-kafka-clients-enhancements: register node topic metric for topic: {0}", topic);
}

final CachedKafkaMetric cachedMetric = CachedKafkaMetrics.newCachedKafkaMetric(metric);
if (cachedMetric.isValid()) {
Expand All @@ -102,6 +114,8 @@ private void report() {
for (final CachedKafkaMetric metric : metrics.values()) {
metric.report(recorder);
}

nodeTopicRegistry.report(recorder);
}

private void debugLog(String message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

import org.apache.kafka.common.Node;

import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* This class is used to track all the metric names that are related to a specific node:
*
* - MessageBroker/Kafka/Nodes/host:port
* - MessageBroker/Kafka/Nodes/host:port/Consume/topicName
* - MessageBroker/Kafka/Nodes/host:port/Produce/topicName
*
* At initialization time we only have the node and the mode (is this a metrics reporter
* for a Kafka consumer or for a Kafka producer?).
*
* Then, as topics are discovered through the metricChange method, the topic metric names are
* generated. This is the best way we have to get track of the topics since they're not
* available when the KafkaConsumer/KafkaProducer is initialized.
*
* For KafkaConsumer, the SubscriptionState doesn't contain the topics and partitions
* at initialization time because it takes time for the rebalance to happen.
*
* For KafkaProducer, topics are dynamic since a producer could send records to any
* topic and the concept of subscription doesn't exist there.
*
* Alternatively we could get the topics from the records in KafkaProducer.doSend or
* KafkaConsumer.poll, and call NewRelicMetricsReporter.addTopicToNodeMetrics from there.
* This approach would have a small impact in performance, and getting the topics from the
* KafkaMetrics is a good enough solution.
*/
public class NodeTopicRegistry {
private final Set<String> recordedTopics = ConcurrentHashMap.newKeySet();
private final Set<String> metricNames = ConcurrentHashMap.newKeySet();
private final Set<String> nodes = new HashSet<>();
private final ClientType clientType;

private static final String METRIC_PREFIX = "MessageBroker/Kafka/Nodes/";

public NodeTopicRegistry(ClientType clientType, Collection<Node> nodes) {
this.clientType = clientType;
for (Node node : nodes) {
String nodeName = node.host() + ":" + node.port();
this.nodes.add(nodeName);
this.metricNames.add(METRIC_PREFIX + nodeName);
}
}

/**
* @return true if the topic was registered
*/
public boolean register(String topic) {
if (topic != null && recordedTopics.add(topic)) {
for (String node : nodes) {
String metricName = METRIC_PREFIX + node + "/" + clientType.getOperation() + "/" + topic;
this.metricNames.add(metricName);
}
return true;
}
return false;
}

public void report(FiniteMetricRecorder recorder) {
for (String topicMetric : metricNames) {
recorder.recordMetric(topicMetric, 1.0f);
}
}

public void close() {
recordedTopics.clear();
metricNames.clear();
nodes.clear();
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

import java.util.concurrent.ThreadFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.kafka.ClientType;
import com.nr.instrumentation.kafka.NewRelicMetricsReporter;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.common.metrics.Metrics;

import java.util.logging.Level;
Expand All @@ -22,7 +24,7 @@ public class KafkaConsumer_Instrumentation<K, V> {

private final Metrics metrics = Weaver.callOriginal();
private final String clientId = Weaver.callOriginal();

private final Metadata metadata = Weaver.callOriginal();
// It's possible for constructors to be invoked multiple times (e.g. `C() { C("some default") }` ).
// When this happens we don't want to register the metrics reporter multiple times.
@NewField
Expand All @@ -33,7 +35,7 @@ public KafkaConsumer_Instrumentation() {
if (!metricsReporterInstalled) {
NewRelic.getAgent().getLogger().log(Level.INFO,
"newrelic-kafka-clients-enhancements engaged for consumer {0}", clientId);
metrics.addReporter(new NewRelicMetricsReporter());
metrics.addReporter(new NewRelicMetricsReporter(ClientType.CONSUMER, metadata.fetch().nodes()));
metricsReporterInstalled = true;
}
}
Expand Down
Loading

0 comments on commit e5bc12c

Please sign in to comment.