From 6bafd2b3d5c6cef04dbf20fb8f63ad642c682a55 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Fri, 15 Nov 2024 05:17:29 +0200 Subject: [PATCH] Fill jvm.thread.state attribute for jvm.thread.count metric on jdk8 (#12724) --- .../runtimemetrics/java8/Threads.java | 67 ++++++++++++++++++- .../java8/ThreadsStableSemconvTest.java | 53 ++++++++++++++- 2 files changed, 115 insertions(+), 5 deletions(-) diff --git a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java index 238c15e37a74..d3af5bf00ea9 100644 --- a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java +++ b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java @@ -25,6 +25,8 @@ import java.util.Locale; import java.util.Map; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; import javax.annotation.Nullable; /** @@ -55,11 +57,34 @@ public final class Threads { /** Register observers for java runtime class metrics. */ public static List registerObservers(OpenTelemetry openTelemetry) { - return INSTANCE.registerObservers(openTelemetry, ManagementFactory.getThreadMXBean()); + return INSTANCE.registerObservers(openTelemetry, !isJava9OrNewer()); + } + + private List registerObservers(OpenTelemetry openTelemetry, boolean useThread) { + if (useThread) { + return registerObservers(openTelemetry, Threads::getThreads); + } + return registerObservers(openTelemetry, ManagementFactory.getThreadMXBean()); } // Visible for testing List registerObservers(OpenTelemetry openTelemetry, ThreadMXBean threadBean) { + return registerObservers( + openTelemetry, + isJava9OrNewer() ? Threads::java9AndNewerCallback : Threads::java8Callback, + threadBean); + } + + // Visible for testing + List registerObservers( + OpenTelemetry openTelemetry, Supplier threadSupplier) { + return registerObservers(openTelemetry, Threads::java8ThreadCallback, threadSupplier); + } + + private static List registerObservers( + OpenTelemetry openTelemetry, + Function> callbackProvider, + T threadInfo) { Meter meter = JmxRuntimeMetricsUtil.getMeter(openTelemetry); List observables = new ArrayList<>(); @@ -68,8 +93,7 @@ List registerObservers(OpenTelemetry openTelemetry, ThreadMXBean .upDownCounterBuilder("jvm.thread.count") .setDescription("Number of executing platform threads.") .setUnit("{thread}") - .buildWithCallback( - isJava9OrNewer() ? java9AndNewerCallback(threadBean) : java8Callback(threadBean))); + .buildWithCallback(callbackProvider.apply(threadInfo))); return observables; } @@ -104,6 +128,36 @@ private static Consumer java8Callback(ThreadMXBean th }; } + private static Consumer java8ThreadCallback( + Supplier supplier) { + return measurement -> { + Map counts = new HashMap<>(); + for (Thread thread : supplier.get()) { + Attributes threadAttributes = threadAttributes(thread); + counts.compute(threadAttributes, (k, value) -> value == null ? 1 : value + 1); + } + counts.forEach((threadAttributes, count) -> measurement.record(count, threadAttributes)); + }; + } + + // Visible for testing + static Thread[] getThreads() { + ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); + while (threadGroup.getParent() != null) { + threadGroup = threadGroup.getParent(); + } + // use a slightly larger array in case new threads are created + int count = threadGroup.activeCount() + 10; + Thread[] threads = new Thread[count]; + int resultSize = threadGroup.enumerate(threads); + if (resultSize == threads.length) { + return threads; + } + Thread[] result = new Thread[resultSize]; + System.arraycopy(threads, 0, result, 0, resultSize); + return result; + } + private static Consumer java9AndNewerCallback( ThreadMXBean threadBean) { return measurement -> { @@ -132,5 +186,12 @@ private static Attributes threadAttributes(ThreadInfo threadInfo) { JvmAttributes.JVM_THREAD_DAEMON, isDaemon, JvmAttributes.JVM_THREAD_STATE, threadState); } + private static Attributes threadAttributes(Thread thread) { + boolean isDaemon = thread.isDaemon(); + String threadState = thread.getState().name().toLowerCase(Locale.ROOT); + return Attributes.of( + JvmAttributes.JVM_THREAD_DAEMON, isDaemon, JvmAttributes.JVM_THREAD_STATE, threadState); + } + private Threads() {} } diff --git a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java index 55fb6b35e802..7d9eb8aa994e 100644 --- a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java +++ b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java @@ -18,6 +18,9 @@ import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledForJreRange; import org.junit.jupiter.api.condition.EnabledOnJre; @@ -41,7 +44,7 @@ class ThreadsStableSemconvTest { @Test @EnabledOnJre(JRE.JAVA_8) - void registerObservers_Java8() { + void registerObservers_Java8Jmx() { when(threadBean.getThreadCount()).thenReturn(7); when(threadBean.getDaemonThreadCount()).thenReturn(2); @@ -75,6 +78,45 @@ void registerObservers_Java8() { equalTo(JVM_THREAD_DAEMON, false)))))); } + @Test + void registerObservers_Java8Thread() { + Thread threadInfo1 = mock(Thread.class, new ThreadInfoAnswer(false, Thread.State.RUNNABLE)); + Thread threadInfo2 = mock(Thread.class, new ThreadInfoAnswer(true, Thread.State.WAITING)); + + Thread[] threads = new Thread[] {threadInfo1, threadInfo2}; + + Threads.INSTANCE + .registerObservers(testing.getOpenTelemetry(), () -> threads) + .forEach(cleanup::deferCleanup); + + testing.waitAndAssertMetrics( + "io.opentelemetry.runtime-telemetry-java8", + "jvm.thread.count", + metrics -> + metrics.anySatisfy( + metricData -> + assertThat(metricData) + .hasInstrumentationScope(EXPECTED_SCOPE) + .hasDescription("Number of executing platform threads.") + .hasUnit("{thread}") + .hasLongSumSatisfying( + sum -> + sum.isNotMonotonic() + .hasPointsSatisfying( + point -> + point + .hasValue(1) + .hasAttributesSatisfying( + equalTo(JVM_THREAD_DAEMON, false), + equalTo(JVM_THREAD_STATE, "runnable")), + point -> + point + .hasValue(1) + .hasAttributesSatisfying( + equalTo(JVM_THREAD_DAEMON, true), + equalTo(JVM_THREAD_STATE, "waiting")))))); + } + @Test @EnabledForJreRange(min = JRE.JAVA_9) void registerObservers_Java9AndNewer() { @@ -120,6 +162,13 @@ void registerObservers_Java9AndNewer() { equalTo(JVM_THREAD_STATE, "waiting")))))); } + @Test + void getThreads() { + Thread[] threads = Threads.getThreads(); + Set set = new HashSet<>(Arrays.asList(threads)); + assertThat(set).contains(Thread.currentThread()); + } + static final class ThreadInfoAnswer implements Answer { private final boolean isDaemon; @@ -135,7 +184,7 @@ public Object answer(InvocationOnMock invocation) { String methodName = invocation.getMethod().getName(); if (methodName.equals("isDaemon")) { return isDaemon; - } else if (methodName.equals("getThreadState")) { + } else if (methodName.equals("getThreadState") || methodName.equals("getState")) { return state; } return null;