From 789e57539737a0ee9fb8af13ce6ab62fc00609b2 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Tue, 10 Oct 2023 11:27:44 +0300 Subject: [PATCH 1/2] Improve vertx-sql client context propagation --- .../vertx/v4_0/sql/PoolInstrumentation.java | 1 + .../sql/QueryExecutorInstrumentation.java | 7 +- .../QueryResultBuilderInstrumentation.java | 16 +- .../v4_0/sql/VertxSqlClientSingletons.java | 57 +++++-- .../vertx/v4_0/sql/VertxSqlClientTest.java | 139 ++++++++++++++++++ 5 files changed, 188 insertions(+), 32 deletions(-) diff --git a/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/PoolInstrumentation.java b/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/PoolInstrumentation.java index f9ff3ee0de77..5513b743215d 100644 --- a/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/PoolInstrumentation.java +++ b/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/PoolInstrumentation.java @@ -97,6 +97,7 @@ public static void onExit( SqlConnectOptions sqlConnectOptions = virtualField.get(pool); future = VertxSqlClientSingletons.attachConnectOptions(future, sqlConnectOptions); + future = VertxSqlClientSingletons.wrapContext(future); } } } diff --git a/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/QueryExecutorInstrumentation.java b/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/QueryExecutorInstrumentation.java index 34394e6b66b7..0264f925cac3 100644 --- a/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/QueryExecutorInstrumentation.java +++ b/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/QueryExecutorInstrumentation.java @@ -6,9 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql; import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; -import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.OTEL_CONTEXT_KEY; -import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.OTEL_PARENT_CONTEXT_KEY; -import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.OTEL_REQUEST_KEY; import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.getSqlConnectOptions; import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.instrumenter; import static net.bytebuddy.matcher.ElementMatchers.isConstructor; @@ -98,9 +95,7 @@ public static void onEnter( context = instrumenter().start(parentContext, otelRequest); scope = context.makeCurrent(); - promiseInternal.context().localContextData().put(OTEL_REQUEST_KEY, otelRequest); - promiseInternal.context().localContextData().put(OTEL_CONTEXT_KEY, context); - promiseInternal.context().localContextData().put(OTEL_PARENT_CONTEXT_KEY, parentContext); + VertxSqlClientSingletons.attachRequest(promiseInternal, otelRequest, context, parentContext); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) diff --git a/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/QueryResultBuilderInstrumentation.java b/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/QueryResultBuilderInstrumentation.java index bd9544417f13..a364cf873d2f 100644 --- a/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/QueryResultBuilderInstrumentation.java +++ b/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/QueryResultBuilderInstrumentation.java @@ -13,8 +13,6 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.vertx.core.Promise; -import io.vertx.core.impl.ContextInternal; -import io.vertx.core.impl.future.PromiseInternal; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -40,12 +38,7 @@ public void transform(TypeTransformer transformer) { public static class CompleteAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static Scope onEnter(@Advice.FieldValue("handler") Promise promise) { - if (!(promise instanceof PromiseInternal)) { - return null; - } - PromiseInternal promiseInternal = (PromiseInternal) promise; - ContextInternal contextInternal = promiseInternal.context(); - return endQuerySpan(contextInternal.localContextData(), null); + return endQuerySpan(promise, null); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @@ -61,12 +54,7 @@ public static class FailAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static Scope onEnter( @Advice.Argument(0) Throwable throwable, @Advice.FieldValue("handler") Promise promise) { - if (!(promise instanceof PromiseInternal)) { - return null; - } - PromiseInternal promiseInternal = (PromiseInternal) promise; - ContextInternal contextInternal = promiseInternal.context(); - return endQuerySpan(contextInternal.localContextData(), throwable); + return endQuerySpan(promise, throwable); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) diff --git a/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/VertxSqlClientSingletons.java b/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/VertxSqlClientSingletons.java index fd9bcd91d084..82800b91d815 100644 --- a/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/VertxSqlClientSingletons.java +++ b/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/VertxSqlClientSingletons.java @@ -19,15 +19,13 @@ import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.SqlConnection; import io.vertx.sqlclient.impl.SqlClientBase; -import java.util.Map; +import java.util.concurrent.CompletableFuture; public final class VertxSqlClientSingletons { - public static final String OTEL_REQUEST_KEY = "otel.request"; - public static final String OTEL_CONTEXT_KEY = "otel.context"; - public static final String OTEL_PARENT_CONTEXT_KEY = "otel.parent-context"; private static final String INSTRUMENTATION_NAME = "io.opentelemetry.vertx-sql-client-4.0"; private static final Instrumenter INSTRUMENTER; private static final ThreadLocal connectOptions = new ThreadLocal<>(); @@ -66,16 +64,33 @@ public static SqlConnectOptions getSqlConnectOptions() { return connectOptions.get(); } - public static Scope endQuerySpan(Map contextData, Throwable throwable) { - VertxSqlClientRequest otelRequest = - (VertxSqlClientRequest) contextData.remove(OTEL_REQUEST_KEY); - Context otelContext = (Context) contextData.remove(OTEL_CONTEXT_KEY); - Context otelParentContext = (Context) contextData.remove(OTEL_PARENT_CONTEXT_KEY); - if (otelRequest == null || otelContext == null || otelParentContext == null) { + private static final VirtualField, RequestData> requestDataField = + VirtualField.find(Promise.class, RequestData.class); + + public static void attachRequest( + Promise promise, VertxSqlClientRequest request, Context context, Context parentContext) { + requestDataField.set(promise, new RequestData(request, context, parentContext)); + } + + public static Scope endQuerySpan(Promise promise, Throwable throwable) { + RequestData requestData = requestDataField.get(promise); + if (requestData == null) { return null; } - instrumenter().end(otelContext, otelRequest, null, throwable); - return otelParentContext.makeCurrent(); + instrumenter().end(requestData.context, requestData.request, null, throwable); + return requestData.parentContext.makeCurrent(); + } + + static class RequestData { + final VertxSqlClientRequest request; + final Context context; + final Context parentContext; + + RequestData(VertxSqlClientRequest request, Context context, Context parentContext) { + this.request = request; + this.context = context; + this.parentContext = parentContext; + } } // this virtual field is also used in SqlClientBase instrumentation @@ -93,5 +108,23 @@ public static Future attachConnectOptions( }); } + public static Future wrapContext(Future future) { + Context context = Context.current(); + CompletableFuture result = new CompletableFuture<>(); + future + .toCompletionStage() + .whenComplete( + (value, throwable) -> { + try (Scope ignore = context.makeCurrent()) { + if (throwable != null) { + result.completeExceptionally(throwable); + } else { + result.complete(value); + } + } + }); + return Future.fromCompletionStage(result); + } + private VertxSqlClientSingletons() {} } diff --git a/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/VertxSqlClientTest.java b/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/VertxSqlClientTest.java index 80a94702f57c..20ea7932367a 100644 --- a/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/VertxSqlClientTest.java +++ b/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/VertxSqlClientTest.java @@ -20,8 +20,10 @@ import static io.opentelemetry.semconv.SemanticAttributes.NET_PEER_PORT; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.TraceAssert; import io.opentelemetry.sdk.trace.data.StatusData; import io.vertx.core.Vertx; import io.vertx.pgclient.PgConnectOptions; @@ -30,10 +32,15 @@ import io.vertx.sqlclient.PoolOptions; import io.vertx.sqlclient.Tuple; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -54,6 +61,9 @@ class VertxSqlClientTest { @RegisterExtension private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + @RegisterExtension + private static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + private static GenericContainer container; private static Vertx vertx; private static Pool pool; @@ -288,4 +298,133 @@ void testWithConnection() throws Exception { assertPreparedSelect(); } + + @Test + void testManyQueries() throws Exception { + int count = 50; + CountDownLatch latch = new CountDownLatch(count); + List> futureList = new ArrayList<>(); + List> resultList = new ArrayList<>(); + for (int i = 0; i < count; i++) { + CompletableFuture future = new CompletableFuture<>(); + futureList.add(future); + resultList.add( + future.whenComplete((rows, throwable) -> testing.runWithSpan("callback", () -> {}))); + } + for (int i = 0; i < count; i++) { + CompletableFuture future = futureList.get(i); + testing.runWithSpan( + "parent", + () -> + pool.query("select * from test") + .execute( + rowSetAsyncResult -> { + if (rowSetAsyncResult.succeeded()) { + future.complete(rowSetAsyncResult.result()); + } else { + future.completeExceptionally(rowSetAsyncResult.cause()); + } + latch.countDown(); + })); + } + latch.await(30, TimeUnit.SECONDS); + for (int i = 0; i < count; i++) { + CompletableFuture result = resultList.get(i); + result.get(10, TimeUnit.SECONDS); + } + + List> assertions = new ArrayList<>(); + for (int i = 0; i < count; i++) { + assertions.add( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL), + span -> + span.hasName("SELECT tempdb.test") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(DB_NAME, DB), + equalTo(DB_USER, USER_DB), + equalTo(DB_STATEMENT, "select * from test"), + equalTo(DB_OPERATION, "SELECT"), + equalTo(DB_SQL_TABLE, "test"), + equalTo(NET_PEER_NAME, "localhost"), + equalTo(NET_PEER_PORT, port)), + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + testing.waitAndAssertTraces(assertions); + } + + @Test + void testConcurrency() throws Exception { + int count = 50; + CountDownLatch latch = new CountDownLatch(count); + List> futureList = new ArrayList<>(); + List> resultList = new ArrayList<>(); + for (int i = 0; i < count; i++) { + CompletableFuture future = new CompletableFuture<>(); + futureList.add(future); + resultList.add( + future.whenComplete((rows, throwable) -> testing.runWithSpan("callback", () -> {}))); + } + ExecutorService executorService = Executors.newFixedThreadPool(4); + cleanup.deferCleanup(() -> executorService.shutdown()); + for (int i = 0; i < count; i++) { + CompletableFuture future = futureList.get(i); + executorService.submit( + () -> { + testing + .runWithSpan( + "parent", + () -> + pool.withConnection( + conn -> + conn.preparedQuery("select * from test where id = $1") + .execute(Tuple.of(1)))) + .onComplete( + rowSetAsyncResult -> { + if (rowSetAsyncResult.succeeded()) { + future.complete(rowSetAsyncResult.result()); + } else { + future.completeExceptionally(rowSetAsyncResult.cause()); + } + latch.countDown(); + }); + }); + } + latch.await(30, TimeUnit.SECONDS); + for (int i = 0; i < count; i++) { + CompletableFuture result = resultList.get(i); + result.get(10, TimeUnit.SECONDS); + } + + List> assertions = new ArrayList<>(); + for (int i = 0; i < count; i++) { + assertions.add( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL), + span -> + span.hasName("SELECT tempdb.test") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(DB_NAME, DB), + equalTo(DB_USER, USER_DB), + equalTo(DB_STATEMENT, "select * from test where id = $?"), + equalTo(DB_OPERATION, "SELECT"), + equalTo(DB_SQL_TABLE, "test"), + equalTo(NET_PEER_NAME, "localhost"), + equalTo(NET_PEER_PORT, port)), + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + testing.waitAndAssertTraces(assertions); + } } From a475d07de2d0730936605d47d3956b86beb0acf3 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Tue, 10 Oct 2023 16:10:11 +0300 Subject: [PATCH 2/2] address review comments --- .../vertx/v4_0/sql/VertxSqlClientTest.java | 103 +++++++++--------- 1 file changed, 49 insertions(+), 54 deletions(-) diff --git a/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/VertxSqlClientTest.java b/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/VertxSqlClientTest.java index 20ea7932367a..fedb0c944073 100644 --- a/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/VertxSqlClientTest.java +++ b/instrumentation/vertx/vertx-sql-client-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/sql/VertxSqlClientTest.java @@ -34,6 +34,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -311,8 +312,7 @@ void testManyQueries() throws Exception { resultList.add( future.whenComplete((rows, throwable) -> testing.runWithSpan("callback", () -> {}))); } - for (int i = 0; i < count; i++) { - CompletableFuture future = futureList.get(i); + for (CompletableFuture future : futureList) { testing.runWithSpan( "parent", () -> @@ -328,34 +328,32 @@ void testManyQueries() throws Exception { })); } latch.await(30, TimeUnit.SECONDS); - for (int i = 0; i < count; i++) { - CompletableFuture result = resultList.get(i); + for (CompletableFuture result : resultList) { result.get(10, TimeUnit.SECONDS); } - List> assertions = new ArrayList<>(); - for (int i = 0; i < count; i++) { - assertions.add( - trace -> - trace.hasSpansSatisfyingExactly( - span -> span.hasName("parent").hasKind(SpanKind.INTERNAL), - span -> - span.hasName("SELECT tempdb.test") - .hasKind(SpanKind.CLIENT) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - equalTo(DB_NAME, DB), - equalTo(DB_USER, USER_DB), - equalTo(DB_STATEMENT, "select * from test"), - equalTo(DB_OPERATION, "SELECT"), - equalTo(DB_SQL_TABLE, "test"), - equalTo(NET_PEER_NAME, "localhost"), - equalTo(NET_PEER_PORT, port)), - span -> - span.hasName("callback") - .hasKind(SpanKind.INTERNAL) - .hasParent(trace.getSpan(0)))); - } + List> assertions = + Collections.nCopies( + count, + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL), + span -> + span.hasName("SELECT tempdb.test") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(DB_NAME, DB), + equalTo(DB_USER, USER_DB), + equalTo(DB_STATEMENT, "select * from test"), + equalTo(DB_OPERATION, "SELECT"), + equalTo(DB_SQL_TABLE, "test"), + equalTo(NET_PEER_NAME, "localhost"), + equalTo(NET_PEER_PORT, port)), + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); testing.waitAndAssertTraces(assertions); } @@ -373,8 +371,7 @@ void testConcurrency() throws Exception { } ExecutorService executorService = Executors.newFixedThreadPool(4); cleanup.deferCleanup(() -> executorService.shutdown()); - for (int i = 0; i < count; i++) { - CompletableFuture future = futureList.get(i); + for (CompletableFuture future : futureList) { executorService.submit( () -> { testing @@ -397,34 +394,32 @@ void testConcurrency() throws Exception { }); } latch.await(30, TimeUnit.SECONDS); - for (int i = 0; i < count; i++) { - CompletableFuture result = resultList.get(i); + for (CompletableFuture result : resultList) { result.get(10, TimeUnit.SECONDS); } - List> assertions = new ArrayList<>(); - for (int i = 0; i < count; i++) { - assertions.add( - trace -> - trace.hasSpansSatisfyingExactly( - span -> span.hasName("parent").hasKind(SpanKind.INTERNAL), - span -> - span.hasName("SELECT tempdb.test") - .hasKind(SpanKind.CLIENT) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - equalTo(DB_NAME, DB), - equalTo(DB_USER, USER_DB), - equalTo(DB_STATEMENT, "select * from test where id = $?"), - equalTo(DB_OPERATION, "SELECT"), - equalTo(DB_SQL_TABLE, "test"), - equalTo(NET_PEER_NAME, "localhost"), - equalTo(NET_PEER_PORT, port)), - span -> - span.hasName("callback") - .hasKind(SpanKind.INTERNAL) - .hasParent(trace.getSpan(0)))); - } + List> assertions = + Collections.nCopies( + count, + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL), + span -> + span.hasName("SELECT tempdb.test") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(DB_NAME, DB), + equalTo(DB_USER, USER_DB), + equalTo(DB_STATEMENT, "select * from test where id = $?"), + equalTo(DB_OPERATION, "SELECT"), + equalTo(DB_SQL_TABLE, "test"), + equalTo(NET_PEER_NAME, "localhost"), + equalTo(NET_PEER_PORT, port)), + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); testing.waitAndAssertTraces(assertions); } }