From fe23425feda552c38b2cb9250f22d960b523eabb Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 9 Nov 2023 16:20:04 +0100 Subject: [PATCH] Context-aware message dispatch on in-memory connector --- .../messaging/memory/InMemoryConnector.java | 109 ++++++--- .../messaging/memory/InMemorySource.java | 8 + .../InMemoryConnectorRunOnContextTest.java | 227 ++++++++++++++++++ .../providers/helpers/MultiUtils.java | 9 +- 4 files changed, 319 insertions(+), 34 deletions(-) create mode 100644 smallrye-reactive-messaging-in-memory/src/test/java/io/smallrye/reactive/messaging/providers/connectors/InMemoryConnectorRunOnContextTest.java diff --git a/smallrye-reactive-messaging-in-memory/src/main/java/io/smallrye/reactive/messaging/memory/InMemoryConnector.java b/smallrye-reactive-messaging-in-memory/src/main/java/io/smallrye/reactive/messaging/memory/InMemoryConnector.java index b704eb2918..6418bc916f 100644 --- a/smallrye-reactive-messaging-in-memory/src/main/java/io/smallrye/reactive/messaging/memory/InMemoryConnector.java +++ b/smallrye-reactive-messaging-in-memory/src/main/java/io/smallrye/reactive/messaging/memory/InMemoryConnector.java @@ -1,31 +1,40 @@ package io.smallrye.reactive.messaging.memory; +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING; + import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; import java.util.concurrent.Flow.Processor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.spi.Connector; -import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory; -import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory; -import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; -import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; -import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor; +import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; +import io.smallrye.reactive.messaging.connector.InboundConnector; +import io.smallrye.reactive.messaging.connector.OutboundConnector; import io.smallrye.reactive.messaging.memory.i18n.InMemoryExceptions; -import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; +import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; +import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; +import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.vertx.mutiny.core.Context; +import io.vertx.mutiny.core.Vertx; /** * An implementation of connector used for testing applications without having to use external broker. @@ -34,13 +43,18 @@ */ @ApplicationScoped @Connector(InMemoryConnector.CONNECTOR) -public class InMemoryConnector implements IncomingConnectorFactory, OutgoingConnectorFactory { +@ConnectorAttribute(name = "run-on-vertx-context", type = "boolean", direction = INCOMING, description = "Whether messages are dispatched on the Vert.x context or not.", defaultValue = "false") +@ConnectorAttribute(name = "broadcast", type = "boolean", direction = INCOMING, description = "Whether the messages are dispatched to multiple consumer", defaultValue = "false") +public class InMemoryConnector implements InboundConnector, OutboundConnector { public static final String CONNECTOR = "smallrye-in-memory"; private final Map> sources = new HashMap<>(); private final Map> sinks = new HashMap<>(); + @Inject + ExecutionHolder executionHolder; + /** * Switch the given incoming channel to in-memory. It replaces the previously used connector with the * in-memory connector. @@ -142,19 +156,19 @@ public static void clear() { } @Override - public PublisherBuilder> getPublisherBuilder(Config config) { - String name = config.getOptionalValue("channel-name", String.class) - .orElseThrow(InMemoryExceptions.ex::illegalArgumentInvalidIncomingConfig); - - boolean broadcast = config.getOptionalValue("broadcast", Boolean.class) - .orElse(false); - return sources.computeIfAbsent(name, n -> new InMemorySourceImpl<>(n, broadcast)).source; + public Flow.Publisher> getPublisher(Config config) { + InMemoryConnectorIncomingConfiguration ic = new InMemoryConnectorIncomingConfiguration(config); + String name = ic.getChannel(); + boolean broadcast = ic.getBroadcast(); + Vertx vertx = executionHolder.vertx(); + boolean runOnVertxContext = ic.getRunOnVertxContext(); + return sources.computeIfAbsent(name, n -> new InMemorySourceImpl<>(n, vertx, runOnVertxContext, broadcast)).source; } @Override - public SubscriberBuilder, Void> getSubscriberBuilder(Config config) { - String name = config.getOptionalValue("channel-name", String.class) - .orElseThrow(InMemoryExceptions.ex::illegalArgumentInvalidOutgoingConfig); + public Flow.Subscriber> getSubscriber(Config config) { + InMemoryConnectorOutgoingConfiguration ic = new InMemoryConnectorOutgoingConfiguration(config); + String name = ic.getChannel(); return sinks.computeIfAbsent(name, InMemorySinkImpl::new).sink; } @@ -211,17 +225,21 @@ public InMemorySink sink(String channel) { private static class InMemorySourceImpl implements InMemorySource { private final Processor, Message> processor; - private final PublisherBuilder> source; + private final Flow.Publisher> source; private final String name; + private final Context context; + private boolean runOnVertxContext; - private InMemorySourceImpl(String name, boolean broadcast) { + private InMemorySourceImpl(String name, Vertx vertx, boolean runOnVertxContext, boolean broadcast) { this.name = name; + this.context = vertx.getOrCreateContext(); + this.runOnVertxContext = runOnVertxContext; if (broadcast) { processor = BroadcastProcessor.create(); } else { processor = UnicastProcessor.create(); } - this.source = ReactiveStreams.fromPublisher(AdaptersToReactiveStreams.publisher(processor)); + this.source = Multi.createFrom().publisher(processor); } @Override @@ -233,26 +251,49 @@ public String name() { public InMemorySource send(T messageOrPayload) { if (messageOrPayload instanceof Message) { //noinspection unchecked - processor.onNext((Message) messageOrPayload); + if (runOnVertxContext) { + context.runOnContext( + () -> processor.onNext(ContextAwareMessage.withContextMetadata((Message) messageOrPayload))); + } else { + processor.onNext(ContextAwareMessage.withContextMetadata((Message) messageOrPayload)); + } } else { - processor.onNext(Message.of(messageOrPayload)); + if (runOnVertxContext) { + context.runOnContext(() -> processor.onNext(ContextAwareMessage.of(messageOrPayload))); + } else { + processor.onNext(ContextAwareMessage.of(messageOrPayload)); + } } return this; } + @Override + public InMemorySource runOnVertxContext(boolean runOnVertxContext) { + this.runOnVertxContext = runOnVertxContext; + return this; + } + @Override public void complete() { - processor.onComplete(); + if (runOnVertxContext) { + context.runOnContext(() -> processor.onComplete()); + } else { + processor.onComplete(); + } } @Override public void fail(Throwable failure) { - processor.onError(failure); + if (runOnVertxContext) { + context.runOnContext(() -> processor.onError(failure)); + } else { + processor.onError(failure); + } } } private static class InMemorySinkImpl implements InMemorySink { - private final SubscriberBuilder, Void> sink; + private final Flow.Subscriber> sink; private final List> list = new CopyOnWriteArrayList<>(); private final AtomicReference failure = new AtomicReference<>(); private final AtomicBoolean completed = new AtomicBoolean(); @@ -260,14 +301,16 @@ private static class InMemorySinkImpl implements InMemorySink { private InMemorySinkImpl(String name) { this.name = name; - this.sink = ReactiveStreams.> builder() - .flatMapCompletionStage(m -> { - list.add(m); - return m.ack().thenApply(x -> m); - }) - .onError(err -> failure.compareAndSet(null, err)) - .onComplete(() -> completed.compareAndSet(false, true)) - .ignore(); + this.sink = MultiUtils.via(multi -> multi.call(m -> { + list.add(m); + Uni ack = Uni.createFrom().completionStage(m::ack); + if (m.getMetadata(LocalContextMetadata.class).isPresent()) { + Context ctx = Context.newInstance(m.getMetadata(LocalContextMetadata.class).get().context()); + ack = ack.emitOn(ctx::runOnContext); + } + return ack; + }).onFailure().invoke(err -> failure.compareAndSet(null, err)) + .onCompletion().invoke(() -> completed.compareAndSet(false, true))); } @Override diff --git a/smallrye-reactive-messaging-in-memory/src/main/java/io/smallrye/reactive/messaging/memory/InMemorySource.java b/smallrye-reactive-messaging-in-memory/src/main/java/io/smallrye/reactive/messaging/memory/InMemorySource.java index 31f00057a0..c0624a41ff 100644 --- a/smallrye-reactive-messaging-in-memory/src/main/java/io/smallrye/reactive/messaging/memory/InMemorySource.java +++ b/smallrye-reactive-messaging-in-memory/src/main/java/io/smallrye/reactive/messaging/memory/InMemorySource.java @@ -22,6 +22,14 @@ public interface InMemorySource { */ InMemorySource send(T messageOrPayload); + /** + * The flag to enable dispatching messages on Vert.x context. + * + * @param runOnVertxContext whether to dispatch messages on Vert.x context or not + * @return this to allow chaining calls. + */ + InMemorySource runOnVertxContext(boolean runOnVertxContext); + /** * Sends the completion event. */ diff --git a/smallrye-reactive-messaging-in-memory/src/test/java/io/smallrye/reactive/messaging/providers/connectors/InMemoryConnectorRunOnContextTest.java b/smallrye-reactive-messaging-in-memory/src/test/java/io/smallrye/reactive/messaging/providers/connectors/InMemoryConnectorRunOnContextTest.java new file mode 100644 index 0000000000..98623ebc43 --- /dev/null +++ b/smallrye-reactive-messaging-in-memory/src/test/java/io/smallrye/reactive/messaging/providers/connectors/InMemoryConnectorRunOnContextTest.java @@ -0,0 +1,227 @@ +package io.smallrye.reactive.messaging.providers.connectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.entry; +import static org.awaitility.Awaitility.await; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.reactive.messaging.annotations.Blocking; +import io.smallrye.reactive.messaging.memory.InMemoryConnector; +import io.smallrye.reactive.messaging.memory.InMemorySink; +import io.smallrye.reactive.messaging.memory.InMemorySource; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class InMemoryConnectorRunOnContextTest extends WeldTestBase { + + @BeforeEach + public void install() { + Map conf = new HashMap<>(); + conf.put("mp.messaging.incoming.foo.connector", InMemoryConnector.CONNECTOR); + conf.put("mp.messaging.incoming.foo.data", "not read"); + conf.put("mp.messaging.incoming.foo.run-on-vertx-context", "true"); + conf.put("mp.messaging.outgoing.bar.connector", InMemoryConnector.CONNECTOR); + conf.put("mp.messaging.outgoing.bar.data", "not read"); + installConfig(new MapBasedConfig(conf)); + + } + + @AfterEach + public void cleanup() { + releaseConfig(); + } + + @Test + public void testWithStrings() { + addBeanClass(MyBeanReceivingString.class); + initialize(); + InMemoryConnector bean = container.getBeanManager().createInstance() + .select(InMemoryConnector.class, ConnectorLiteral.of(InMemoryConnector.CONNECTOR)).get(); + assertThat(bean).isNotNull(); + InMemorySink bar = bean.sink("bar"); + InMemorySource foo = bean.source("foo"); + foo.send("hello"); + await().untilAsserted( + () -> assertThat(bar.received()).hasSize(1).extracting(Message::getPayload).containsExactly("HELLO")); + } + + @Test + public void testWithMessages() { + addBeanClass(MyBeanReceivingMessage.class); + initialize(); + AtomicBoolean acked = new AtomicBoolean(); + InMemoryConnector bean = container.getBeanManager().createInstance() + .select(InMemoryConnector.class, ConnectorLiteral.of(InMemoryConnector.CONNECTOR)).get(); + assertThat(bean).isNotNull(); + InMemorySource> foo = bean.source("foo"); + InMemorySink bar = bean.sink("bar"); + + Message msg = Message.of("hello", () -> { + acked.set(true); + CompletableFuture future = new CompletableFuture<>(); + future.complete(null); + return future; + }); + foo.send(msg); + await().untilAsserted( + () -> assertThat(bar.received()).hasSize(1).extracting(Message::getPayload).containsExactly("HELLO")); + assertThat(acked).isTrue(); + } + + @Test + public void testWithMultiplePayloads() { + addBeanClass(MyBeanReceivingString.class); + initialize(); + InMemoryConnector bean = container.getBeanManager().createInstance() + .select(InMemoryConnector.class, ConnectorLiteral.of(InMemoryConnector.CONNECTOR)).get(); + assertThat(bean).isNotNull(); + InMemorySource foo = bean.source("foo"); + InMemorySink bar = bean.sink("bar"); + foo.send("1"); + foo.send("2"); + foo.send("3"); + await().untilAsserted( + () -> assertThat(bar.received()).hasSize(3).extracting(Message::getPayload).containsExactly("1", "2", "3")); + bar.clear(); + foo.send("4"); + foo.send("5"); + foo.send("6"); + foo.complete(); + + await().untilAsserted(() -> { + assertThat(bar.received()).hasSize(3).extracting(Message::getPayload).containsExactly("4", "5", "6"); + assertThat(bar.hasCompleted()).isTrue(); + assertThat(bar.hasFailed()).isFalse(); + }); + } + + @Test + public void testWithFailure() { + addBeanClass(MyBeanReceivingString.class); + initialize(); + InMemoryConnector bean = container.getBeanManager().createInstance() + .select(InMemoryConnector.class, ConnectorLiteral.of(InMemoryConnector.CONNECTOR)).get(); + assertThat(bean).isNotNull(); + InMemorySource foo = bean.source("foo"); + InMemorySink bar = bean.sink("bar"); + foo.send("1"); + foo.send("2"); + foo.send("3"); + await().untilAsserted( + () -> assertThat(bar.received()).hasSize(3).extracting(Message::getPayload).containsExactly("1", "2", "3")); + foo.fail(new Exception("boom")); + + await().untilAsserted(() -> { + assertThat(bar.hasCompleted()).isFalse(); + assertThat(bar.hasFailed()).isTrue(); + assertThat(bar.getFailure()).hasMessageContaining("boom"); + }); + + bar.clear(); + assertThat(bar.hasCompleted()).isFalse(); + assertThat(bar.hasFailed()).isFalse(); + assertThat(bar.getFailure()).isNull(); + + } + + @Test + public void testWithUnknownSource() { + addBeanClass(MyBeanReceivingString.class); + initialize(); + InMemoryConnector bean = container.getBeanManager().createInstance() + .select(InMemoryConnector.class, ConnectorLiteral.of(InMemoryConnector.CONNECTOR)).get(); + assertThat(bean).isNotNull(); + assertThatThrownBy(() -> bean.source("unknown")).isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testWithUnknownSink() { + addBeanClass(MyBeanReceivingString.class); + initialize(); + InMemoryConnector bean = container.getBeanManager().createInstance() + .select(InMemoryConnector.class, ConnectorLiteral.of(InMemoryConnector.CONNECTOR)).get(); + assertThat(bean).isNotNull(); + assertThatThrownBy(() -> bean.sink("unknown")).isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testSwitchAndClear() { + assertThat(System.getProperties()).doesNotContainKeys( + "mp.messaging.incoming.a.connector", "mp.messaging.incoming.b.connector", + "mp.messaging.outgoing.x.connector", "mp.messaging.outgoing.y.connector"); + + InMemoryConnector.switchIncomingChannelsToInMemory("a", "b"); + InMemoryConnector.switchOutgoingChannelsToInMemory("x", "y"); + assertThat(System.getProperties()) + .contains(entry("mp.messaging.incoming.a.connector", InMemoryConnector.CONNECTOR)); + assertThat(System.getProperties()) + .contains(entry("mp.messaging.incoming.b.connector", InMemoryConnector.CONNECTOR)); + assertThat(System.getProperties()) + .contains(entry("mp.messaging.outgoing.x.connector", InMemoryConnector.CONNECTOR)); + assertThat(System.getProperties()) + .contains(entry("mp.messaging.outgoing.y.connector", InMemoryConnector.CONNECTOR)); + + InMemoryConnector.clear(); + + assertThat(System.getProperties()).doesNotContainKeys( + "mp.messaging.incoming.a.connector", "mp.messaging.incoming.b.connector", + "mp.messaging.outgoing.x.connector", "mp.messaging.outgoing.y.connector"); + } + + @Test + public void testSwitchOnApplication() { + addBeanClass(MyBeanReceivingString.class); + Map map1 = InMemoryConnector.switchIncomingChannelsToInMemory("foo"); + Map map2 = InMemoryConnector.switchOutgoingChannelsToInMemory("bar"); + initialize(); + + assertThat(map1).containsExactly(entry("mp.messaging.incoming.foo.connector", InMemoryConnector.CONNECTOR)); + assertThat(map2).containsExactly(entry("mp.messaging.outgoing.bar.connector", InMemoryConnector.CONNECTOR)); + + InMemoryConnector connector = container.getBeanManager().createInstance() + .select(InMemoryConnector.class, ConnectorLiteral.of(InMemoryConnector.CONNECTOR)).get(); + + connector.source("foo").send("hello"); + await().untilAsserted(() -> assertThat(connector.sink("bar").received().stream() + .map(Message::getPayload).collect(Collectors.toList())).containsExactly("HELLO")); + } + + @ApplicationScoped + public static class MyBeanReceivingString { + + @Incoming("foo") + @Outgoing("bar") + @Blocking + public String process(String s) { + return s.toUpperCase(); + } + + } + + @ApplicationScoped + public static class MyBeanReceivingMessage { + + @Incoming("foo") + @Outgoing("bar") + public Message process(Message s) { + return s.withPayload(s.getPayload().toUpperCase()); + } + + } + +} diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java index cd81e157d4..e9d2975ff3 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java @@ -158,8 +158,11 @@ public void onError(Throwable failure) { if (isDoneOrCancelled()) { return; } - this.done = true; + Flow.Subscriber subscriber = downstream; + if (subscriber != null) { + subscriber.onError(failure); + } } @Override @@ -168,6 +171,10 @@ public void onComplete() { return; } this.done = true; + Flow.Subscriber subscriber = downstream; + if (subscriber != null) { + subscriber.onComplete(); + } } @Override