Skip to content

Commit

Permalink
Merge pull request #2844 from ozangunalp/disable_context_propagation
Browse files Browse the repository at this point in the history
Disable context propagation for blocking message dispatch
  • Loading branch information
cescoffier authored Dec 9, 2024
2 parents 408865a + cac14ed commit 797f74e
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.Function;
import java.util.function.Supplier;

import jakarta.enterprise.inject.Instance;

Expand All @@ -22,6 +23,8 @@

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.uni.builders.UniCreateFromDeferredSupplier;
import io.smallrye.reactive.messaging.*;
import io.smallrye.reactive.messaging.PublisherDecorator;
import io.smallrye.reactive.messaging.keyed.KeyValueExtractor;
Expand Down Expand Up @@ -183,10 +186,10 @@ protected <T> Uni<T> invokeBlocking(Message<?> message, Object... args) {
try {
Optional<LocalContextMetadata> metadata = message != null ? message.getMetadata().get(LocalContextMetadata.class)
: Optional.empty();
Context currentContext = metadata.map(m -> Context.newInstance(m.context()))
Context msgContext = metadata.map(m -> Context.newInstance(m.context()))
.orElseGet(Vertx::currentContext);
return workerPoolRegistry.executeWork(currentContext,
Uni.createFrom().deferred(() -> {
return workerPoolRegistry.executeWork(msgContext,
skipContextPropagation(() -> {
try {
Object result = this.invoker.invoke(args);
if (result instanceof CompletionStage) {
Expand All @@ -209,6 +212,17 @@ protected <T> Uni<T> invokeBlocking(Message<?> message, Object... args) {
}
}

/**
* Skips Mutiny supplier decoration in order to avoid context propagation.
*
* @param supplier the supplier to skip context propagation
* @return a Uni that skips context propagation
* @param <T> the type of the Uni
*/
public static <T> Uni<T> skipContextPropagation(Supplier<Uni<? extends T>> supplier) {
return Infrastructure.onUniCreation(new UniCreateFromDeferredSupplier<>(supplier));
}

protected CompletionStage<Message<?>> getAckOrCompletion(Message<?> message) {
CompletionStage<Void> ack = message.ack();
if (ack != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
import io.vertx.core.impl.ContextInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.WorkerExecutor;

@ApplicationScoped
Expand Down Expand Up @@ -66,32 +68,66 @@ public void init() {
}
}

public <T> Uni<T> executeWork(Context currentContext, Uni<T> uni, String workerName, boolean ordered) {
public <T> Uni<T> executeWork(Context msgContext, Uni<T> uni, String workerName, boolean ordered) {
if (holder == null) {
throw new UnsupportedOperationException("@Blocking disabled");
}
Objects.requireNonNull(uni, msg.actionNotProvided());

if (workerName == null) {
if (currentContext != null) {
return currentContext.executeBlocking(Uni.createFrom().deferred(() -> uni), ordered);
if (msgContext != null) {
return msgContext.executeBlocking(uni, ordered);
}
// No current context, use the Vert.x instance.
return holder.vertx().executeBlocking(uni, ordered);
} else {
if (currentContext != null) {
return getWorker(workerName).executeBlocking(uni, ordered)
WorkerExecutor worker = getWorker(workerName);
if (msgContext != null) {
return uniOnMessageContext(worker.executeBlocking(uni, ordered), msgContext)
.onItemOrFailure().transformToUni((item, failure) -> {
return Uni.createFrom().emitter(emitter -> {
if (failure != null) {
currentContext.runOnContext(() -> emitter.fail(failure));
msgContext.runOnContext(() -> emitter.fail(failure));
} else {
currentContext.runOnContext(() -> emitter.complete(item));
msgContext.runOnContext(() -> emitter.complete(item));
}
});
});
}
return getWorker(workerName).executeBlocking(uni, ordered);
return worker.executeBlocking(uni, ordered);
}
}

private static <T> Uni<T> uniOnMessageContext(Uni<T> uni, Context msgContext) {
if (msgContext != null && !msgContext.equals(Vertx.currentContext())) {
return Uni.createFrom().deferred(() -> uni)
.runSubscriptionOn(r -> new ContextPreservingRunnable(r, msgContext).run());
}
return uni;
}

private static final class ContextPreservingRunnable implements Runnable {

private final Runnable task;
private final io.vertx.core.Context context;

public ContextPreservingRunnable(Runnable task, Context context) {
this.task = task;
this.context = context.getDelegate();
}

@Override
public void run() {
if (context instanceof ContextInternal) {
ContextInternal contextInternal = (ContextInternal) context;
final var previousContext = contextInternal.beginDispatch();
try {
task.run();
} finally {
contextInternal.endDispatch(previousContext);
}
} else {
task.run();
}
}
}

Expand Down

0 comments on commit 797f74e

Please sign in to comment.