diff --git a/api/src/main/java/io/smallrye/reactive/messaging/PublisherDecorator.java b/api/src/main/java/io/smallrye/reactive/messaging/PublisherDecorator.java index 5e79fef5b5..41cef8bb44 100644 --- a/api/src/main/java/io/smallrye/reactive/messaging/PublisherDecorator.java +++ b/api/src/main/java/io/smallrye/reactive/messaging/PublisherDecorator.java @@ -34,7 +34,7 @@ public interface PublisherDecorator extends Prioritized { * @return the extended multi * @deprecated replaced with {@link #decorate(Multi, List, boolean)} */ - @Deprecated(since = "4.10.1") + @Deprecated(since = "4.12.0") default Multi> decorate(Multi> publisher, String channelName, boolean isConnector) { return publisher; diff --git a/api/src/main/java/io/smallrye/reactive/messaging/observation/DefaultMessageObservation.java b/api/src/main/java/io/smallrye/reactive/messaging/observation/DefaultMessageObservation.java new file mode 100644 index 0000000000..d8878fc08f --- /dev/null +++ b/api/src/main/java/io/smallrye/reactive/messaging/observation/DefaultMessageObservation.java @@ -0,0 +1,77 @@ +package io.smallrye.reactive.messaging.observation; + +import java.time.Duration; + +import org.eclipse.microprofile.reactive.messaging.Message; + +/** + * The default implementation based on system nano time. + */ +public class DefaultMessageObservation implements MessageObservation { + + // metadata + private final String channelName; + + // time + private final long creation; + protected volatile long completion; + + // status + protected volatile boolean done; + protected volatile Throwable nackReason; + + public DefaultMessageObservation(String channelName) { + this(channelName, System.nanoTime()); + } + + public DefaultMessageObservation(String channelName, long creationTime) { + this.channelName = channelName; + this.creation = creationTime; + } + + @Override + public String getChannel() { + return channelName; + } + + @Override + public long getCreationTime() { + return creation; + } + + @Override + public long getCompletionTime() { + return completion; + } + + @Override + public boolean isDone() { + return done || nackReason != null; + } + + @Override + public Throwable getReason() { + return nackReason; + } + + @Override + public Duration getCompletionDuration() { + if (isDone()) { + return Duration.ofNanos(completion - creation); + } + return null; + } + + @Override + public void onMessageAck(Message message) { + completion = System.nanoTime(); + done = true; + } + + @Override + public void onMessageNack(Message message, Throwable reason) { + completion = System.nanoTime(); + nackReason = reason; + } + +} diff --git a/api/src/main/java/io/smallrye/reactive/messaging/observation/MessageObservation.java b/api/src/main/java/io/smallrye/reactive/messaging/observation/MessageObservation.java new file mode 100644 index 0000000000..8045d7c79c --- /dev/null +++ b/api/src/main/java/io/smallrye/reactive/messaging/observation/MessageObservation.java @@ -0,0 +1,56 @@ +package io.smallrye.reactive.messaging.observation; + +import java.time.Duration; + +import org.eclipse.microprofile.reactive.messaging.Message; + +/** + * The message observation contract + */ +public interface MessageObservation { + + /** + * @return the channel name of the message + */ + String getChannel(); + + /** + * @return the creation time of the message in system nanos + */ + long getCreationTime(); + + /** + * @return the completion time of the message in system nanos + */ + long getCompletionTime(); + + /** + * + * @return the duration between creation and the completion time, null if message processing is not completed + */ + Duration getCompletionDuration(); + + /** + * + * @return {@code true} if the message processing is completed with acknowledgement or negative acknowledgement + */ + boolean isDone(); + + /** + * @return the negative acknowledgement reason + */ + Throwable getReason(); + + /** + * Notify the observation of acknowledgement event + * + */ + void onMessageAck(Message message); + + /** + * Notify the observation of negative acknowledgement event + * + * @param reason the reason of the negative acknowledgement + */ + void onMessageNack(Message message, Throwable reason); +} diff --git a/api/src/main/java/io/smallrye/reactive/messaging/observation/MessageObservationCollector.java b/api/src/main/java/io/smallrye/reactive/messaging/observation/MessageObservationCollector.java new file mode 100644 index 0000000000..2abd059d85 --- /dev/null +++ b/api/src/main/java/io/smallrye/reactive/messaging/observation/MessageObservationCollector.java @@ -0,0 +1,44 @@ +package io.smallrye.reactive.messaging.observation; + +import org.eclipse.microprofile.reactive.messaging.Message; + +/** + * The observation collector is called with the new message and returns the message observation that will be used + * to observe messages from their creation until the ack or the nack event + *

+ * + *

+ * The implementation of this interface must be a CDI managed bean in order to be discovered + * + * @param the type of the observation context + */ +public interface MessageObservationCollector { + + /** + * Initialize observation for the given channel + * If {@code null} is returned the observation for the given channel is disabled + * + * @param channel the channel of the message + * @param incoming whether the channel is incoming or outgoing + * @param emitter whether the channel is an emitter + * @return the observation context + */ + default T initObservation(String channel, boolean incoming, boolean emitter) { + // enabled by default + return (T) ObservationContext.DEFAULT; + } + + /** + * Returns a new {@link MessageObservation} object on which to collect the message processing events. + * If {@link #initObservation(String, boolean, boolean)} is implemented, + * the {@link ObservationContext} object returned from that method will be passed to this method. + * If not it is called with {@link ObservationContext#DEFAULT} and should be ignored. + * + * @param channel the channel of the message + * @param message the message + * @param observationContext the observation context + * @return the message observation + */ + MessageObservation onNewMessage(String channel, Message message, T observationContext); + +} diff --git a/api/src/main/java/io/smallrye/reactive/messaging/observation/ObservationContext.java b/api/src/main/java/io/smallrye/reactive/messaging/observation/ObservationContext.java new file mode 100644 index 0000000000..874baa4cff --- /dev/null +++ b/api/src/main/java/io/smallrye/reactive/messaging/observation/ObservationContext.java @@ -0,0 +1,22 @@ +package io.smallrye.reactive.messaging.observation; + +/** + * The per-channel context of the Message observation. + * It is created at the observation initialization by-channel and included at each message observation calls. + */ +public interface ObservationContext { + + /** + * Default no-op observation context + */ + ObservationContext DEFAULT = observation -> { + + }; + + /** + * Called after observation is completed. + * + * @param observation the completed message observation + */ + void complete(MessageObservation observation); +} diff --git a/documentation/mkdocs.yml b/documentation/mkdocs.yml index c4472d6b1d..dfc6d4c5db 100644 --- a/documentation/mkdocs.yml +++ b/documentation/mkdocs.yml @@ -29,6 +29,7 @@ nav: - '@Outgoings' : concepts/outgoings.md - 'Testing' : concepts/testing.md - 'Logging' : concepts/logging.md + - 'Observability API' : concepts/observability.md - 'Advanced Configuration' : concepts/advanced-config.md - 'Message Context' : concepts/message-context.md - 'Metadata Injection': concepts/incoming-metadata-injection.md diff --git a/documentation/src/main/docs/concepts/observability.md b/documentation/src/main/docs/concepts/observability.md new file mode 100644 index 0000000000..a69302ed68 --- /dev/null +++ b/documentation/src/main/docs/concepts/observability.md @@ -0,0 +1,30 @@ +# Observability API + +!!!important + Observability API is experimental and SmallRye only feature. + +Smallrye Reactive Messaging proposes an observability API that allows to observe messages received and send through inbound and outbound channels. + +For any observation to happen, you need to provide an implementation of the `MessageObservationCollector`, discovered as a CDI-managed bean. + +At wiring time the discovered `MessageObservationCollector` implementation `initObservation` method is called once per channel to initialize the `ObservationContext`. +The default `initObservation` implementation returns a default `ObservationContext` object, +but the collector implementation can provide a custom per-channel `ObservationContext` object that'll hold information necessary for the observation. +The `ObservationContext#complete` method is called each time a message observation is completed – message being acked or nacked. +The collector implementation can decide at initialization time to disable the observation per channel by returning a `null` observation context. + +For each new message, the collector is on `onNewMessage` method with the channel name, the `Message` and the `ObservationContext` object initialized beforehand. +This method can react to the creation of a new message but also is responsible for instantiating and returning a `MessageObservation`. +While custom implementations can augment the observability capability, SmallRye Reactive Messaging provides a default implementation `DefaultMessageObservation`. + +So a simple observability collector can be implemented as such: + +``` java +{{ insert('observability/SimpleMessageObservationCollector.java', ) }} +``` + +A collector with a custom `ObservationContext` can be implemented as such : + +``` java +{{ insert('observability/ContextMessageObservationCollector.java', ) }} +``` diff --git a/documentation/src/main/java/observability/ContextMessageObservationCollector.java b/documentation/src/main/java/observability/ContextMessageObservationCollector.java new file mode 100644 index 0000000000..608e368dc1 --- /dev/null +++ b/documentation/src/main/java/observability/ContextMessageObservationCollector.java @@ -0,0 +1,57 @@ +package observability; + +import java.time.Duration; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.reactive.messaging.observation.DefaultMessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservationCollector; +import io.smallrye.reactive.messaging.observation.ObservationContext; + +@ApplicationScoped +public class ContextMessageObservationCollector + implements MessageObservationCollector { + + @Override + public MyContext initObservation(String channel, boolean incoming, boolean emitter) { + // Called on observation setup, per channel + // if returned null the observation for that channel is disabled + return new MyContext(channel, incoming, emitter); + } + + @Override + public MessageObservation onNewMessage(String channel, Message message, MyContext ctx) { + // Called after message has been created + return new DefaultMessageObservation(channel); + } + + public static class MyContext implements ObservationContext { + + private final String channel; + private final boolean incoming; + private final boolean emitter; + + public MyContext(String channel, boolean incoming, boolean emitter) { + this.channel = channel; + this.incoming = incoming; + this.emitter = emitter; + } + + @Override + public void complete(MessageObservation observation) { + // called after message processing has completed and observation is done + // register duration + Duration duration = observation.getCompletionDuration(); + Throwable reason = observation.getReason(); + if (reason != null) { + // message was nacked + } else { + // message was acked successfully + } + } + } + +} diff --git a/documentation/src/main/java/observability/SimpleMessageObservationCollector.java b/documentation/src/main/java/observability/SimpleMessageObservationCollector.java new file mode 100644 index 0000000000..838bceb5fa --- /dev/null +++ b/documentation/src/main/java/observability/SimpleMessageObservationCollector.java @@ -0,0 +1,21 @@ +package observability; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.reactive.messaging.observation.DefaultMessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservationCollector; +import io.smallrye.reactive.messaging.observation.ObservationContext; + +@ApplicationScoped +public class SimpleMessageObservationCollector implements MessageObservationCollector { + + @Override + public MessageObservation onNewMessage(String channel, Message message, ObservationContext ctx) { + // Called after message has been created + return new DefaultMessageObservation(channel); + } + +} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java index 0886d8abef..a8d6ce83a5 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java @@ -36,6 +36,8 @@ import io.smallrye.reactive.messaging.providers.extension.LegacyEmitterFactoryImpl; import io.smallrye.reactive.messaging.providers.extension.MediatorManager; import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl; +import io.smallrye.reactive.messaging.providers.extension.ObservationDecorator; +import io.smallrye.reactive.messaging.providers.extension.OutgoingObservationDecorator; import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension; import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory; import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories; @@ -102,6 +104,8 @@ public void initWeld() { weld.addBeanClass(MetricDecorator.class); weld.addBeanClass(MicrometerDecorator.class); weld.addBeanClass(ContextDecorator.class); + weld.addBeanClass(ObservationDecorator.class); + weld.addBeanClass(OutgoingObservationDecorator.class); weld.disableDiscovery(); } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/metrics/ObservationTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/metrics/ObservationTest.java new file mode 100644 index 0000000000..636dc00b69 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/metrics/ObservationTest.java @@ -0,0 +1,251 @@ +package io.smallrye.reactive.messaging.kafka.metrics; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; +import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; +import io.smallrye.reactive.messaging.kafka.metrics.ObservationTest.MyReactiveMessagingMessageObservationCollector.KafkaMessageObservation; +import io.smallrye.reactive.messaging.observation.DefaultMessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservationCollector; +import io.smallrye.reactive.messaging.observation.ObservationContext; + +public class ObservationTest extends KafkaCompanionTestBase { + + @Test + void testConsumeIndividualMessages() { + addBeans(MyReactiveMessagingMessageObservationCollector.class); + addBeans(MyConsumingApp.class); + + runApplication(kafkaConfig("mp.messaging.incoming.kafka", false) + .with("topic", topic) + .with(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()) + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())); + + companion.produceStrings() + .fromMulti(Multi.createFrom().range(0, 5).map(i -> new ProducerRecord<>(topic, null, Integer.toString(i)))); + + MyReactiveMessagingMessageObservationCollector reporter = get(MyReactiveMessagingMessageObservationCollector.class); + await().untilAsserted(() -> { + assertThat(reporter.getObservations()).hasSize(5); + assertThat(reporter.getObservations()).allSatisfy(obs -> { + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime()); + assertThat(obs.getReason()).isNull(); + }); + }); + } + + @Test + void testConsumeBatchMessages() { + addBeans(MyReactiveMessagingMessageObservationCollector.class); + addBeans(MyBatchConsumingApp.class); + + runApplication(kafkaConfig("mp.messaging.incoming.kafka", false) + .with("topic", topic) + .with("batch", true) + .with(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()) + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())); + + companion.produceStrings() + .fromMulti(Multi.createFrom().range(0, 1000).map(i -> new ProducerRecord<>(topic, null, Integer.toString(i)))); + + MyReactiveMessagingMessageObservationCollector reporter = get(MyReactiveMessagingMessageObservationCollector.class); + MyBatchConsumingApp batches = get(MyBatchConsumingApp.class); + await().untilAsserted(() -> { + assertThat(batches.count()).isEqualTo(1000); + assertThat(reporter.getObservations()).allSatisfy(obs -> { + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime()); + assertThat(obs.getReason()).isNull(); + }); + }); + } + + @Test + void testProducer() { + addBeans(MyReactiveMessagingMessageObservationCollector.class); + addBeans(MyProducerApp.class); + + runApplication(kafkaConfig("mp.messaging.outgoing.kafka", false) + .with("topic", topic) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())); + + MyReactiveMessagingMessageObservationCollector reporter = get(MyReactiveMessagingMessageObservationCollector.class); + MyProducerApp producer = get(MyProducerApp.class); + await().untilAsserted(() -> { + assertThat(producer.count()).isEqualTo(5); + assertThat(reporter.getObservations()).hasSize(5); + assertThat(reporter.getObservations()).allSatisfy(obs -> { + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime()); + assertThat(obs.getReason()).isNull(); + }); + }); + } + + @Test + void testEmitterProducer() { + addBeans(MyReactiveMessagingMessageObservationCollector.class); + addBeans(MyEmitterProducerApp.class); + + runApplication(kafkaConfig("mp.messaging.outgoing.kafka", false) + .with("topic", topic) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())); + + MyReactiveMessagingMessageObservationCollector reporter = get(MyReactiveMessagingMessageObservationCollector.class); + MyEmitterProducerApp producer = get(MyEmitterProducerApp.class); + producer.produce(); + await().untilAsserted(() -> { + assertThat(producer.count()).isEqualTo(5); + assertThat(reporter.getObservations()).hasSize(5); + assertThat(reporter.getObservations()).allSatisfy(obs -> { + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime()); + assertThat(obs.getReason()).isNull(); + }); + }); + } + + @ApplicationScoped + public static class MyConsumingApp { + @Incoming("kafka") + public void consume(String ignored, KafkaMessageObservation metadata) { + assertThat(metadata).isNotNull(); + } + } + + @ApplicationScoped + public static class MyBatchConsumingApp { + + AtomicInteger count = new AtomicInteger(); + + @Incoming("kafka") + public void consume(List s, KafkaMessageObservation metadata) { + assertThat(metadata).isNotNull(); + count.addAndGet(s.size()); + } + + public int count() { + return count.get(); + } + } + + @ApplicationScoped + public static class MyProducerApp { + + AtomicInteger acked = new AtomicInteger(); + + @Outgoing("kafka") + public Multi> produce() { + return Multi.createFrom().items("1", "2", "3", "4", "5") + .map(s -> Message.of(s, () -> { + acked.incrementAndGet(); + return CompletableFuture.completedFuture(null); + })); + } + + public int count() { + return acked.get(); + } + } + + @ApplicationScoped + public static class MyEmitterProducerApp { + AtomicInteger acked = new AtomicInteger(); + + @Inject + @Channel("kafka") + Emitter emitter; + + public void produce() { + for (int i = 0; i < 5; i++) { + emitter.send(Message.of(String.valueOf(i + 1), () -> { + acked.incrementAndGet(); + return CompletableFuture.completedFuture(null); + })); + } + } + + public int count() { + return acked.get(); + } + } + + @ApplicationScoped + public static class MyReactiveMessagingMessageObservationCollector + implements MessageObservationCollector { + + private final List observations = new CopyOnWriteArrayList<>(); + + @Override + public MessageObservation onNewMessage(String channel, Message message, ObservationContext ctx) { + KafkaMessageObservation observation = new KafkaMessageObservation(channel, message); + observations.add(observation); + return observation; + } + + public List getObservations() { + return observations; + } + + public static class KafkaMessageObservation extends DefaultMessageObservation { + final long recordTs; + final long createdMs; + volatile long completedMs; + + public KafkaMessageObservation(String channel, Message message) { + super(channel); + this.createdMs = System.currentTimeMillis(); + Optional metadata = message.getMetadata(IncomingKafkaRecordMetadata.class); + if (metadata.isPresent()) { + Instant inst = metadata.get().getTimestamp(); + recordTs = inst.toEpochMilli(); + System.out.println("record " + recordTs); + } else { + recordTs = 0L; + } + } + + @Override + public void onMessageAck(Message message) { + super.onMessageAck(message); + completedMs = System.currentTimeMillis(); + System.out.println("completed in " + completedMs); + done = true; + } + } + } + +} diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ObservationDecorator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ObservationDecorator.java new file mode 100644 index 0000000000..410db59a1d --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ObservationDecorator.java @@ -0,0 +1,104 @@ +package io.smallrye.reactive.messaging.providers.extension; + +import static io.smallrye.mutiny.unchecked.Unchecked.consumer; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.Optional; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.ChannelRegistry; +import io.smallrye.reactive.messaging.PublisherDecorator; +import io.smallrye.reactive.messaging.observation.MessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservationCollector; +import io.smallrye.reactive.messaging.observation.ObservationContext; +import io.smallrye.reactive.messaging.providers.ProcessingException; + +@ApplicationScoped +public class ObservationDecorator implements PublisherDecorator { + + @Inject + @ConfigProperty(name = "smallrye.messaging.observation.enabled", defaultValue = "true") + boolean enabled; + + @Inject + ChannelRegistry registry; + + @Inject + Instance> observationCollector; + + @Override + public Multi> decorate(Multi> multi, List channelName, + boolean isConnector) { + String channel = channelName.isEmpty() ? null : channelName.get(0); + boolean isEmitter = registry.getEmitterNames().contains(channel); + if (observationCollector.isResolvable() && enabled && (isConnector || isEmitter)) { + // if this is an emitter channel than it is an outgoing channel => incoming=false + return decorateObservation(observationCollector.get(), multi, channel, !isEmitter, isEmitter); + } + return multi; + } + + static Multi> decorateObservation( + MessageObservationCollector obsCollector, + Multi> multi, + String channel, + boolean incoming, + boolean emitter) { + MessageObservationCollector collector = (MessageObservationCollector) obsCollector; + ObservationContext context = collector.initObservation(channel, incoming, emitter); + if (context == null) { + return multi; + } + return multi.map(message -> { + MessageObservation observation = collector.onNewMessage(channel, message, context); + if (observation != null) { + return message.addMetadata(observation) + .thenApply(msg -> msg.withAckWithMetadata(metadata -> msg.ack(metadata) + .thenAccept(consumer(x -> getObservationMetadata(metadata) + .ifPresent(obs -> { + obs.onMessageAck(msg); + context.complete(obs); + }))))) + .thenApply(msg -> msg.withNackWithMetadata((reason, metadata) -> { + getObservationMetadata(metadata).ifPresent(consumer(obs -> { + obs.onMessageNack(msg, extractReason(reason)); + context.complete(obs); + })); + return msg.nack(reason, metadata); + })); + } else { + return message; + } + }); + } + + static Optional getObservationMetadata(Metadata metadata) { + for (Object item : metadata) { + if (item instanceof MessageObservation) { + return Optional.of((MessageObservation) item); + } + } + return Optional.empty(); + } + + static Throwable extractReason(Throwable reason) { + if (reason instanceof ProcessingException) { + Throwable cause = reason.getCause(); + if (cause instanceof InvocationTargetException) { + cause = ((InvocationTargetException) cause).getTargetException(); + } + return cause; + } + return reason; + } + +} diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/OutgoingObservationDecorator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/OutgoingObservationDecorator.java new file mode 100644 index 0000000000..fe95894c06 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/OutgoingObservationDecorator.java @@ -0,0 +1,43 @@ +package io.smallrye.reactive.messaging.providers.extension; + +import static io.smallrye.reactive.messaging.providers.extension.ObservationDecorator.decorateObservation; + +import java.util.List; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.ChannelRegistry; +import io.smallrye.reactive.messaging.SubscriberDecorator; +import io.smallrye.reactive.messaging.observation.MessageObservationCollector; + +@ApplicationScoped +public class OutgoingObservationDecorator implements SubscriberDecorator { + + @Inject + @ConfigProperty(name = "smallrye.messaging.observation.enabled", defaultValue = "true") + boolean enabled; + + @Inject + ChannelRegistry registry; + + @Inject + Instance> observationCollector; + + @Override + public Multi> decorate(Multi> multi, List channelName, + boolean isConnector) { + String channel = channelName.isEmpty() ? null : channelName.get(0); + boolean isEmitter = registry.getEmitterNames().contains(channel); + if (observationCollector.isResolvable() && enabled && !isEmitter && isConnector) { + return decorateObservation(observationCollector.get(), multi, channel, false, false); + } + return multi; + } + +} diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java index dcd1257547..6dec4d27f9 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java @@ -32,6 +32,8 @@ import io.smallrye.reactive.messaging.providers.extension.LegacyEmitterFactoryImpl; import io.smallrye.reactive.messaging.providers.extension.MediatorManager; import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl; +import io.smallrye.reactive.messaging.providers.extension.ObservationDecorator; +import io.smallrye.reactive.messaging.providers.extension.OutgoingObservationDecorator; import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension; import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory; import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories; @@ -121,7 +123,9 @@ public void setUp() { LegacyEmitterFactoryImpl.class, OutgoingInterceptorDecorator.class, IncomingInterceptorDecorator.class, - + // Observation Decorator + ObservationDecorator.class, + OutgoingObservationDecorator.class, // SmallRye config io.smallrye.config.inject.ConfigProducer.class); diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/inject/EmitterObservationTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/inject/EmitterObservationTest.java new file mode 100644 index 0000000000..0b628306d9 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/inject/EmitterObservationTest.java @@ -0,0 +1,131 @@ +package io.smallrye.reactive.messaging.inject; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.junit.jupiter.api.Test; + +import io.smallrye.reactive.messaging.WeldTestBaseWithoutTails; +import io.smallrye.reactive.messaging.observation.DefaultMessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservationCollector; +import io.smallrye.reactive.messaging.observation.ObservationContext; + +public class EmitterObservationTest extends WeldTestBaseWithoutTails { + + @Test + void testObservationPointsWhenEmittingPayloads() { + addBeanClass(MyMessageObservationCollector.class); + addBeanClass(MyComponentWithAnEmitterOfPayload.class); + + initialize(); + + MyMessageObservationCollector observation = container.select(MyMessageObservationCollector.class).get(); + MyComponentWithAnEmitterOfPayload component = get(MyComponentWithAnEmitterOfPayload.class); + + component.emit(1); + component.emit(2); + component.emit(3); + + await().untilAsserted(() -> assertThat(observation.getObservations()).hasSize(3)); + assertThat(observation.getObservations()).allSatisfy(obs -> { + assertThat(obs.isDone()).isTrue(); + assertThat(obs.getReason()).isNull(); + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getCompletionTime()).isNotEqualTo(-1); + }); + + } + + @Test + void testObservationPointsWhenEmittingMessages() { + addBeanClass(MyComponentWithAnEmitterOfMessage.class); + addBeanClass(MyMessageObservationCollector.class); + + initialize(); + + MyMessageObservationCollector observation = container.select(MyMessageObservationCollector.class).get(); + MyComponentWithAnEmitterOfMessage component = get(MyComponentWithAnEmitterOfMessage.class); + + component.emit(Message.of(1)); + component.emit(Message.of(2)); + component.emit(Message.of(3)); + + await().untilAsserted(() -> assertThat(observation.getObservations()).hasSize(3)); + assertThat(observation.getObservations()).allSatisfy(obs -> { + assertThat(obs.isDone()).isTrue(); + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getCompletionTime()).isNotEqualTo(-1); + }); + + assertThat(observation.getObservations().get(2).getReason()).isInstanceOf(IllegalArgumentException.class); + + } + + @ApplicationScoped + public static class MyComponentWithAnEmitterOfPayload { + + @Inject + @Channel("output") + Emitter emitter; + + public void emit(int i) { + emitter.send(i); + } + + @Incoming("output") + public void consume(int i) { + // do nothing. + } + + } + + @ApplicationScoped + public static class MyComponentWithAnEmitterOfMessage { + + @Inject + @Channel("output") + Emitter emitter; + + public void emit(Message i) { + emitter.send(i); + } + + @Incoming("output") + public void consume(int i, MessageObservation mo) { + assertThat(mo).isNotNull(); + if (i == 3) { + throw new IllegalArgumentException("boom"); + } + } + + } + + @ApplicationScoped + public static class MyMessageObservationCollector implements MessageObservationCollector { + + private final List observations = new CopyOnWriteArrayList<>(); + + public List getObservations() { + return observations; + } + + @Override + public MessageObservation onNewMessage(String channel, Message message, ObservationContext ctx) { + MessageObservation observation = new DefaultMessageObservation(channel); + observations.add(observation); + return observation; + } + } +} diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/connectors/ObservationTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/connectors/ObservationTest.java new file mode 100644 index 0000000000..e707a75076 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/connectors/ObservationTest.java @@ -0,0 +1,301 @@ +package io.smallrye.reactive.messaging.providers.connectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.WeldTestBaseWithoutTails; +import io.smallrye.reactive.messaging.observation.DefaultMessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservationCollector; +import io.smallrye.reactive.messaging.observation.ObservationContext; + +public class ObservationTest extends WeldTestBaseWithoutTails { + + @BeforeEach + void setupConfig() { + installConfig("src/test/resources/config/observation.properties"); + } + + @Test + void testMessageObservationPointsFromPayloadConsumer() { + addBeanClass(MyMessageObservationCollector.class); + addBeanClass(MyPayloadConsumer.class); + + initialize(); + + MyMessageObservationCollector observation = container.select(MyMessageObservationCollector.class).get(); + MyPayloadConsumer consumer = container.select(MyPayloadConsumer.class).get(); + + await().until(() -> observation.getObservations().size() == 3); + await().until(() -> consumer.received().size() == 3); + + assertThat(observation.getObservations()).allSatisfy(obs -> { + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime()); + }); + + assertThat(observation.getObservations().get(0).getReason()).isNull(); + assertThat(observation.getObservations().get(1).getReason()).isInstanceOf(IOException.class); + assertThat(observation.getObservations().get(2).getReason()).isInstanceOf(MalformedURLException.class); + } + + @Test + void testMessageObservationPointsFromMessageConsumer() { + addBeanClass(MyMessageObservationCollector.class); + addBeanClass(MyMessageConsumer.class); + + initialize(); + + MyMessageObservationCollector observation = container.select(MyMessageObservationCollector.class).get(); + MyMessageConsumer consumer = container.select(MyMessageConsumer.class).get(); + + await().until(() -> observation.getObservations().size() == 3); + await().until(() -> consumer.received().size() == 3); + + assertThat(observation.getObservations()).allSatisfy(obs -> { + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime()); + }); + + assertThat(observation.getObservations().get(0).getReason()).isNull(); + assertThat(observation.getObservations().get(1).getReason()).isInstanceOf(IOException.class); + assertThat(observation.getObservations().get(2).getReason()).isInstanceOf(MalformedURLException.class); + } + + @Test + void testMessageObservationPointsFromMessageProducer() { + addBeanClass(MyMessageObservationCollector.class); + addBeanClass(MyMessageProducer.class); + + initialize(); + + MyMessageObservationCollector observation = container.select(MyMessageObservationCollector.class).get(); + MyMessageProducer producer = container.select(MyMessageProducer.class).get(); + + await().until(() -> observation.getObservations().size() == 3); + await().until(() -> producer.acked().size() == 3); + + assertThat(observation.getObservations()).allSatisfy(obs -> { + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getReason()).isNull(); + assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime()); + }); + } + + @Test + void testMessageObservationPointsFromPayloadProducer() { + addBeanClass(MyMessageObservationCollector.class); + addBeanClass(MyPayloadProducer.class); + + initialize(); + + MyMessageObservationCollector observation = container.select(MyMessageObservationCollector.class).get(); + MyPayloadProducer producer = container.select(MyPayloadProducer.class).get(); + + await().until(() -> observation.getObservations().size() == 3); + + assertThat(observation.getObservations()).allSatisfy(obs -> { + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getReason()).isNull(); + assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime()); + }); + } + + @Test + void testMessageObservationPointsFromPayloadEmitter() { + addBeanClass(MyMessageObservationCollector.class); + addBeanClass(MyEmitterProducer.class); + + initialize(); + + MyMessageObservationCollector observation = container.select(MyMessageObservationCollector.class).get(); + MyEmitterProducer producer = container.select(MyEmitterProducer.class).get(); + + producer.produce(); + + await().until(() -> observation.getObservations().size() == 3); + + assertThat(observation.getObservations()).allSatisfy(obs -> { + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getReason()).isNull(); + assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime()); + }); + } + + @Test + void testMessageObservationPointsFromMessageEmitter() { + addBeanClass(MyMessageObservationCollector.class); + addBeanClass(MyEmitterMessageProducer.class); + + initialize(); + + MyMessageObservationCollector observation = container.select(MyMessageObservationCollector.class).get(); + MyEmitterMessageProducer producer = container.select(MyEmitterMessageProducer.class).get(); + + producer.produceMessages(); + + await().until(() -> observation.getObservations().size() == 3); + await().until(() -> producer.acked().size() == 3); + + assertThat(observation.getObservations()).allSatisfy(obs -> { + assertThat(obs.getCreationTime()).isNotEqualTo(-1); + assertThat(obs.getReason()).isNull(); + assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime()); + }); + } + + @ApplicationScoped + public static class MyPayloadConsumer { + private final List received = new CopyOnWriteArrayList<>(); + + @Incoming("A") + void consume(int payload, MessageObservation tracking) throws IOException { + received.add(payload); + assertThat(tracking).isNotNull(); + if (payload == 3) { + throw new IOException(); + } + if (payload == 4) { + throw new MalformedURLException(); + } + } + + public List received() { + return received; + } + } + + @ApplicationScoped + public static class MyMessageConsumer { + + private final List received = new CopyOnWriteArrayList<>(); + + @Incoming("A") + CompletionStage consume(Message msg) { + int payload = msg.getPayload(); + received.add(payload); + assertThat(msg.getMetadata(MessageObservation.class)).isNotEmpty(); + if (payload == 3) { + return msg.nack(new IOException()); + } + if (payload == 4) { + return msg.nack(new MalformedURLException()); + } + return msg.ack(); + } + + public List received() { + return received; + } + } + + @ApplicationScoped + public static class MyPayloadProducer { + + private final List acked = new CopyOnWriteArrayList<>(); + + @Outgoing("B") + Multi produce() { + return Multi.createFrom().items(1, 2, 3); + } + + public List acked() { + return acked; + } + } + + @ApplicationScoped + public static class MyMessageProducer { + + private final List acked = new CopyOnWriteArrayList<>(); + + @Outgoing("B") + Multi> produce() { + return Multi.createFrom().items(1, 2, 3) + .map(i -> Message.of(i, () -> { + acked.add(i); + return CompletableFuture.completedFuture(null); + })); + } + + public List acked() { + return acked; + } + } + + @ApplicationScoped + public static class MyEmitterProducer { + + @Inject + @Channel("B") + Emitter emitter; + + void produce() { + emitter.send(1); + emitter.send(2); + emitter.send(3); + } + + } + + @ApplicationScoped + public static class MyEmitterMessageProducer { + + private final List acked = new CopyOnWriteArrayList<>(); + + @Inject + @Channel("B") + Emitter emitter; + + void produceMessages() { + for (int i = 0; i < 3; i++) { + int j = i; + emitter.send(Message.of(j, () -> { + acked.add(j); + return CompletableFuture.completedFuture(null); + })); + } + } + + public List acked() { + return acked; + } + } + + @ApplicationScoped + public static class MyMessageObservationCollector implements MessageObservationCollector { + + private final List observations = new CopyOnWriteArrayList<>(); + + @Override + public MessageObservation onNewMessage(String channel, Message message, ObservationContext ctx) { + MessageObservation observation = new DefaultMessageObservation(channel); + observations.add(observation); + return observation; + } + + public List getObservations() { + return observations; + } + + } +} diff --git a/smallrye-reactive-messaging-provider/src/test/resources/config/observation.properties b/smallrye-reactive-messaging-provider/src/test/resources/config/observation.properties new file mode 100644 index 0000000000..202e52bab8 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/resources/config/observation.properties @@ -0,0 +1,4 @@ +# You should not be able to use the same channel name in an outgoing and incoming configuration +mp.messaging.incoming.A.connector=dummy +mp.messaging.outgoing.B.connector=dummy +