From cac14ede80e8e334b7193094cd0c5504814538ec Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 12 Nov 2024 09:22:16 +0100 Subject: [PATCH] Disable context propagation for emitters Make sure to dispatch messagess on named worker thread pool with the message context --- .../messaging/providers/AbstractMediator.java | 20 +++++-- .../connectors/WorkerPoolRegistry.java | 54 +++++++++++++++---- 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java index b75dad90e3..a5a008fd49 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java @@ -13,6 +13,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import java.util.function.Function; +import java.util.function.Supplier; import jakarta.enterprise.inject.Instance; @@ -22,6 +23,8 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.operators.uni.builders.UniCreateFromDeferredSupplier; import io.smallrye.reactive.messaging.*; import io.smallrye.reactive.messaging.PublisherDecorator; import io.smallrye.reactive.messaging.keyed.KeyValueExtractor; @@ -183,10 +186,10 @@ protected Uni invokeBlocking(Message message, Object... args) { try { Optional metadata = message != null ? message.getMetadata().get(LocalContextMetadata.class) : Optional.empty(); - Context currentContext = metadata.map(m -> Context.newInstance(m.context())) + Context msgContext = metadata.map(m -> Context.newInstance(m.context())) .orElseGet(Vertx::currentContext); - return workerPoolRegistry.executeWork(currentContext, - Uni.createFrom().deferred(() -> { + return workerPoolRegistry.executeWork(msgContext, + skipContextPropagation(() -> { try { Object result = this.invoker.invoke(args); if (result instanceof CompletionStage) { @@ -209,6 +212,17 @@ protected Uni invokeBlocking(Message message, Object... args) { } } + /** + * Skips Mutiny supplier decoration in order to avoid context propagation. + * + * @param supplier the supplier to skip context propagation + * @return a Uni that skips context propagation + * @param the type of the Uni + */ + public static Uni skipContextPropagation(Supplier> supplier) { + return Infrastructure.onUniCreation(new UniCreateFromDeferredSupplier<>(supplier)); + } + protected CompletionStage> getAckOrCompletion(Message message) { CompletionStage ack = message.ack(); if (ack != null) { diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/WorkerPoolRegistry.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/WorkerPoolRegistry.java index 13bdf45677..8fbe382ea3 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/WorkerPoolRegistry.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/WorkerPoolRegistry.java @@ -30,7 +30,9 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.annotations.Blocking; import io.smallrye.reactive.messaging.providers.helpers.Validation; +import io.vertx.core.impl.ContextInternal; import io.vertx.mutiny.core.Context; +import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.core.WorkerExecutor; @ApplicationScoped @@ -66,32 +68,66 @@ public void init() { } } - public Uni executeWork(Context currentContext, Uni uni, String workerName, boolean ordered) { + public Uni executeWork(Context msgContext, Uni uni, String workerName, boolean ordered) { if (holder == null) { throw new UnsupportedOperationException("@Blocking disabled"); } Objects.requireNonNull(uni, msg.actionNotProvided()); - if (workerName == null) { - if (currentContext != null) { - return currentContext.executeBlocking(Uni.createFrom().deferred(() -> uni), ordered); + if (msgContext != null) { + return msgContext.executeBlocking(uni, ordered); } // No current context, use the Vert.x instance. return holder.vertx().executeBlocking(uni, ordered); } else { - if (currentContext != null) { - return getWorker(workerName).executeBlocking(uni, ordered) + WorkerExecutor worker = getWorker(workerName); + if (msgContext != null) { + return uniOnMessageContext(worker.executeBlocking(uni, ordered), msgContext) .onItemOrFailure().transformToUni((item, failure) -> { return Uni.createFrom().emitter(emitter -> { if (failure != null) { - currentContext.runOnContext(() -> emitter.fail(failure)); + msgContext.runOnContext(() -> emitter.fail(failure)); } else { - currentContext.runOnContext(() -> emitter.complete(item)); + msgContext.runOnContext(() -> emitter.complete(item)); } }); }); } - return getWorker(workerName).executeBlocking(uni, ordered); + return worker.executeBlocking(uni, ordered); + } + } + + private static Uni uniOnMessageContext(Uni uni, Context msgContext) { + if (msgContext != null && !msgContext.equals(Vertx.currentContext())) { + return Uni.createFrom().deferred(() -> uni) + .runSubscriptionOn(r -> new ContextPreservingRunnable(r, msgContext).run()); + } + return uni; + } + + private static final class ContextPreservingRunnable implements Runnable { + + private final Runnable task; + private final io.vertx.core.Context context; + + public ContextPreservingRunnable(Runnable task, Context context) { + this.task = task; + this.context = context.getDelegate(); + } + + @Override + public void run() { + if (context instanceof ContextInternal) { + ContextInternal contextInternal = (ContextInternal) context; + final var previousContext = contextInternal.beginDispatch(); + try { + task.run(); + } finally { + contextInternal.endDispatch(previousContext); + } + } else { + task.run(); + } } }