Skip to content

Commit

Permalink
Context-aware message dispatch on in-memory connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Nov 13, 2023
1 parent 409c35c commit fe23425
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
package io.smallrye.reactive.messaging.memory;

import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.memory.i18n.InMemoryExceptions;
import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

/**
* An implementation of connector used for testing applications without having to use external broker.
Expand All @@ -34,13 +43,18 @@
*/
@ApplicationScoped
@Connector(InMemoryConnector.CONNECTOR)
public class InMemoryConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
@ConnectorAttribute(name = "run-on-vertx-context", type = "boolean", direction = INCOMING, description = "Whether messages are dispatched on the Vert.x context or not.", defaultValue = "false")
@ConnectorAttribute(name = "broadcast", type = "boolean", direction = INCOMING, description = "Whether the messages are dispatched to multiple consumer", defaultValue = "false")
public class InMemoryConnector implements InboundConnector, OutboundConnector {

public static final String CONNECTOR = "smallrye-in-memory";

private final Map<String, InMemorySourceImpl<?>> sources = new HashMap<>();
private final Map<String, InMemorySinkImpl<?>> sinks = new HashMap<>();

@Inject
ExecutionHolder executionHolder;

/**
* Switch the given <em>incoming</em> channel to in-memory. It replaces the previously used connector with the
* in-memory connector.
Expand Down Expand Up @@ -142,19 +156,19 @@ public static void clear() {
}

@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
String name = config.getOptionalValue("channel-name", String.class)
.orElseThrow(InMemoryExceptions.ex::illegalArgumentInvalidIncomingConfig);

boolean broadcast = config.getOptionalValue("broadcast", Boolean.class)
.orElse(false);
return sources.computeIfAbsent(name, n -> new InMemorySourceImpl<>(n, broadcast)).source;
public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
InMemoryConnectorIncomingConfiguration ic = new InMemoryConnectorIncomingConfiguration(config);
String name = ic.getChannel();
boolean broadcast = ic.getBroadcast();
Vertx vertx = executionHolder.vertx();
boolean runOnVertxContext = ic.getRunOnVertxContext();
return sources.computeIfAbsent(name, n -> new InMemorySourceImpl<>(n, vertx, runOnVertxContext, broadcast)).source;
}

@Override
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
String name = config.getOptionalValue("channel-name", String.class)
.orElseThrow(InMemoryExceptions.ex::illegalArgumentInvalidOutgoingConfig);
public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
InMemoryConnectorOutgoingConfiguration ic = new InMemoryConnectorOutgoingConfiguration(config);
String name = ic.getChannel();
return sinks.computeIfAbsent(name, InMemorySinkImpl::new).sink;
}

Expand Down Expand Up @@ -211,17 +225,21 @@ public <T> InMemorySink<T> sink(String channel) {

private static class InMemorySourceImpl<T> implements InMemorySource<T> {
private final Processor<Message<T>, Message<T>> processor;
private final PublisherBuilder<? extends Message<T>> source;
private final Flow.Publisher<? extends Message<T>> source;
private final String name;
private final Context context;
private boolean runOnVertxContext;

private InMemorySourceImpl(String name, boolean broadcast) {
private InMemorySourceImpl(String name, Vertx vertx, boolean runOnVertxContext, boolean broadcast) {
this.name = name;
this.context = vertx.getOrCreateContext();
this.runOnVertxContext = runOnVertxContext;
if (broadcast) {
processor = BroadcastProcessor.create();
} else {
processor = UnicastProcessor.create();
}
this.source = ReactiveStreams.fromPublisher(AdaptersToReactiveStreams.publisher(processor));
this.source = Multi.createFrom().publisher(processor);
}

@Override
Expand All @@ -233,41 +251,66 @@ public String name() {
public InMemorySource<T> send(T messageOrPayload) {
if (messageOrPayload instanceof Message) {
//noinspection unchecked
processor.onNext((Message<T>) messageOrPayload);
if (runOnVertxContext) {
context.runOnContext(
() -> processor.onNext(ContextAwareMessage.withContextMetadata((Message<T>) messageOrPayload)));
} else {
processor.onNext(ContextAwareMessage.withContextMetadata((Message<T>) messageOrPayload));
}
} else {
processor.onNext(Message.of(messageOrPayload));
if (runOnVertxContext) {
context.runOnContext(() -> processor.onNext(ContextAwareMessage.of(messageOrPayload)));
} else {
processor.onNext(ContextAwareMessage.of(messageOrPayload));
}
}
return this;
}

@Override
public InMemorySource<T> runOnVertxContext(boolean runOnVertxContext) {
this.runOnVertxContext = runOnVertxContext;
return this;
}

@Override
public void complete() {
processor.onComplete();
if (runOnVertxContext) {
context.runOnContext(() -> processor.onComplete());
} else {
processor.onComplete();
}
}

@Override
public void fail(Throwable failure) {
processor.onError(failure);
if (runOnVertxContext) {
context.runOnContext(() -> processor.onError(failure));
} else {
processor.onError(failure);
}
}
}

private static class InMemorySinkImpl<T> implements InMemorySink<T> {
private final SubscriberBuilder<? extends Message<T>, Void> sink;
private final Flow.Subscriber<? extends Message<T>> sink;
private final List<Message<T>> list = new CopyOnWriteArrayList<>();
private final AtomicReference<Throwable> failure = new AtomicReference<>();
private final AtomicBoolean completed = new AtomicBoolean();
private final String name;

private InMemorySinkImpl(String name) {
this.name = name;
this.sink = ReactiveStreams.<Message<T>> builder()
.flatMapCompletionStage(m -> {
list.add(m);
return m.ack().thenApply(x -> m);
})
.onError(err -> failure.compareAndSet(null, err))
.onComplete(() -> completed.compareAndSet(false, true))
.ignore();
this.sink = MultiUtils.via(multi -> multi.call(m -> {
list.add(m);
Uni<Void> ack = Uni.createFrom().completionStage(m::ack);
if (m.getMetadata(LocalContextMetadata.class).isPresent()) {
Context ctx = Context.newInstance(m.getMetadata(LocalContextMetadata.class).get().context());
ack = ack.emitOn(ctx::runOnContext);
}
return ack;
}).onFailure().invoke(err -> failure.compareAndSet(null, err))
.onCompletion().invoke(() -> completed.compareAndSet(false, true)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ public interface InMemorySource<T> {
*/
InMemorySource<T> send(T messageOrPayload);

/**
* The flag to enable dispatching messages on Vert.x context.
*
* @param runOnVertxContext whether to dispatch messages on Vert.x context or not
* @return this to allow chaining calls.
*/
InMemorySource<T> runOnVertxContext(boolean runOnVertxContext);

/**
* Sends the completion event.
*/
Expand Down
Loading

0 comments on commit fe23425

Please sign in to comment.