From 946f99f60c02b4268cee90c5e4236b60510da426 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Sat, 14 Oct 2023 11:18:47 +0200 Subject: [PATCH] Connector contribution guide --- documentation/mkdocs.yml | 1 + documentation/pom.xml | 53 +++ .../src/main/docs/concepts/concepts.md | 2 + .../src/main/docs/concepts/connectors.md | 3 +- .../docs/concepts/contributing-connectors.md | 391 ++++++++++++++++++ .../main/java/connectors/MyAckHandler.java | 25 ++ .../src/main/java/connectors/MyConnector.java | 75 ++++ .../connectors/MyConnectorWithPartials.java | 106 +++++ .../java/connectors/MyFailureHandler.java | 27 ++ .../java/connectors/MyIncomingChannel.java | 57 +++ .../MyIncomingChannelWithPartials.java | 79 ++++ .../java/connectors/MyIncomingMetadata.java | 34 ++ .../java/connectors/MyInjectableMessage.java | 28 ++ .../src/main/java/connectors/MyMessage.java | 51 +++ .../java/connectors/MyMessageConverter.java | 27 ++ .../java/connectors/MyOutgoingChannel.java | 49 +++ .../MyOutgoingChannelWithPartials.java | 105 +++++ .../java/connectors/MyOutgoingMessage.java | 70 ++++ .../java/connectors/MyOutgoingMetadata.java | 56 +++ .../java/connectors/api/BrokerClient.java | 21 + .../java/connectors/api/ConsumedMessage.java | 23 ++ .../main/java/connectors/api/PublishAck.java | 4 + .../main/java/connectors/api/SendMessage.java | 46 +++ .../java/connectors/test/MyConnectorTest.java | 52 +++ .../java/connectors/test/WeldTestBase.java | 140 +++++++ .../tracing/MyAttributesExtractor.java | 70 ++++ .../tracing/MyOpenTelemetryInstrumenter.java | 64 +++ .../main/java/connectors/tracing/MyTrace.java | 52 +++ .../tracing/MyTraceTextMapGetter.java | 27 ++ .../tracing/MyTraceTextMapSetter.java | 18 + 30 files changed, 1754 insertions(+), 2 deletions(-) create mode 100644 documentation/src/main/docs/concepts/contributing-connectors.md create mode 100644 documentation/src/main/java/connectors/MyAckHandler.java create mode 100644 documentation/src/main/java/connectors/MyConnector.java create mode 100644 documentation/src/main/java/connectors/MyConnectorWithPartials.java create mode 100644 documentation/src/main/java/connectors/MyFailureHandler.java create mode 100644 documentation/src/main/java/connectors/MyIncomingChannel.java create mode 100644 documentation/src/main/java/connectors/MyIncomingChannelWithPartials.java create mode 100644 documentation/src/main/java/connectors/MyIncomingMetadata.java create mode 100644 documentation/src/main/java/connectors/MyInjectableMessage.java create mode 100644 documentation/src/main/java/connectors/MyMessage.java create mode 100644 documentation/src/main/java/connectors/MyMessageConverter.java create mode 100644 documentation/src/main/java/connectors/MyOutgoingChannel.java create mode 100644 documentation/src/main/java/connectors/MyOutgoingChannelWithPartials.java create mode 100644 documentation/src/main/java/connectors/MyOutgoingMessage.java create mode 100644 documentation/src/main/java/connectors/MyOutgoingMetadata.java create mode 100644 documentation/src/main/java/connectors/api/BrokerClient.java create mode 100644 documentation/src/main/java/connectors/api/ConsumedMessage.java create mode 100644 documentation/src/main/java/connectors/api/PublishAck.java create mode 100644 documentation/src/main/java/connectors/api/SendMessage.java create mode 100644 documentation/src/main/java/connectors/test/MyConnectorTest.java create mode 100644 documentation/src/main/java/connectors/test/WeldTestBase.java create mode 100644 documentation/src/main/java/connectors/tracing/MyAttributesExtractor.java create mode 100644 documentation/src/main/java/connectors/tracing/MyOpenTelemetryInstrumenter.java create mode 100644 documentation/src/main/java/connectors/tracing/MyTrace.java create mode 100644 documentation/src/main/java/connectors/tracing/MyTraceTextMapGetter.java create mode 100644 documentation/src/main/java/connectors/tracing/MyTraceTextMapSetter.java diff --git a/documentation/mkdocs.yml b/documentation/mkdocs.yml index 8e08ea299d..92cbcc39b3 100644 --- a/documentation/mkdocs.yml +++ b/documentation/mkdocs.yml @@ -14,6 +14,7 @@ nav: - 'Development Model': concepts/model.md - 'Emitters and Channel' : concepts/emitter.md - 'Connectors' : concepts/connectors.md + - 'Contributing Connectors' : concepts/contributing-connectors.md - 'Acknowledgement': concepts/acknowledgement.md - 'Blocking Processing': concepts/blocking.md - 'Method Signatures': concepts/signatures.md diff --git a/documentation/pom.xml b/documentation/pom.xml index 9add62bcd6..8a1a7fab95 100644 --- a/documentation/pom.xml +++ b/documentation/pom.xml @@ -25,6 +25,15 @@ + + io.smallrye.config + smallrye-config + + + io.smallrye.reactive + smallrye-connector-attribute-processor + ${project.version} + io.smallrye.reactive smallrye-reactive-messaging-provider @@ -87,6 +96,34 @@ ${camel.version} + + io.smallrye.reactive + test-common + ${project.version} + + compile + + + org.jboss.weld.se + weld-se-shaded + ${weld.version} + + compile + + + org.jboss.weld + weld-core-impl + ${weld.version} + + compile + + + org.awaitility + awaitility + + compile + + org.junit.jupiter junit-jupiter-api @@ -165,6 +202,22 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + ${project.build.directory}/generated-sources/ + + + io.smallrye.reactive.messaging.connector.ConnectorAttributeProcessor + + + org.jboss.logging.processor.apt.LoggingToolsProcessor + + + + + org.apache.maven.plugins maven-install-plugin diff --git a/documentation/src/main/docs/concepts/concepts.md b/documentation/src/main/docs/concepts/concepts.md index 9a015a3269..664228bac9 100644 --- a/documentation/src/main/docs/concepts/concepts.md +++ b/documentation/src/main/docs/concepts/concepts.md @@ -1,3 +1,5 @@ +# Concepts + When dealing with event-driven or data streaming applications, there are a few concepts and vocabulary to introduce. diff --git a/documentation/src/main/docs/concepts/connectors.md b/documentation/src/main/docs/concepts/connectors.md index 05711d380e..2801463662 100644 --- a/documentation/src/main/docs/concepts/connectors.md +++ b/documentation/src/main/docs/concepts/connectors.md @@ -145,8 +145,7 @@ mp.messaging.outgoing.data.acks=1 the connector’s name and set the `connector` attribute for each channel managed by this connector. - -# Connector attribute table +## Connector attribute table In the connector documentation, you will find a table listing the attribute supported by the connector. Be aware that attributes for diff --git a/documentation/src/main/docs/concepts/contributing-connectors.md b/documentation/src/main/docs/concepts/contributing-connectors.md new file mode 100644 index 0000000000..94c356ccff --- /dev/null +++ b/documentation/src/main/docs/concepts/contributing-connectors.md @@ -0,0 +1,391 @@ +# Connector Contribution Guide + +A connector implementation is a CDI-managed bean, typically an `@ApplicationScoped` bean, +which is identified by the `@Connector` identifier. +In order to provide inbound and outbound channels, the connector implements two interfaces `InboundConnector` and `OutboundConnector` respectively. +In addition to that, the connector bean is annotated with `@ConnectorAttribute`, which describes attributes to configure channels. + +!!! abstract "Maven Archetype" + Smallrye Reactive Messaging provides a Maven archetype to bootstrap a new connector. + You can generate a new connector project with the code described in this guide using: + ```shell + mvn -N archetype:generate \ + -DarchetypeGroupId=io.smallrye.reactive \ + -DarchetypeArtifactId=smallrye-reactive-messaging-connector-archetype \ + -DarchetypeVersion={{ attributes['project-version'] }} \ + -DgroupId=io.smallrye.reactive \ + -Dpackage=io.smallrye.reactive.messaging.my \ + -Dversion={{ attributes['project-version'] }} \ + -DartifactId=smallrye-reactive-messaging-my \ + -DconnectorName=my + ``` +The following is an example of a connector skeleton : + +``` java +{{ insert('connectors/MyConnector.java') }} +``` + +Note that the `getPublisher` and `getSubscriber` methods receive MicroProfile Config `Config` instance and wrap it with +`MyConnectorIncomingConfiguration` and `MyConnectorOutgoingConfiguration` objects. + +These custom channel configuration types ease getting channel configuration, including the optional or default values. +They are generated by the `smallrye-connector-attribute-processor` annotation processor, and can be configured in project pom like the following: + +```xml + + + ${project.groupId} + smallrye-reactive-messaging-provider + ${project.version} + + + io.smallrye.reactive + smallrye-connector-attribute-processor + ${project.version} + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${project.build.directory}/generated-sources/ + + + io.smallrye.reactive.messaging.connector.ConnectorAttributeProcessor + + + org.jboss.logging.processor.apt.LoggingToolsProcessor + + + + + + +``` + +The `smallrye-reactive-messaging-provider` is the minimum required dependency for a connector implementation. +You'll also note that the `LoggingToolsProcessor` annotation processor is also configured. +This enables generating internationalized log statements and exceptions. +Typically, you would create following interfaces in `i18n` sub-package: `[Connector]Exceptions`, `[Connector]Logging` and `[Connector]Messages`. +More information can be found in [JBoss Logging Tools documentation](https://jboss-logging.github.io/jboss-logging-tools/). + +## Implementing Inbound Channels + +The `InboundConnector` implementation returns, for a given channel configuration, a reactive stream of `Message`s. +The returned reactive stream is an instance of `Flow.Publisher` and typically can be implemented using Mutiny `Multi` type. + +!!! note "`IncomingConnectorFactory`" + The inbound channels can also implement the `IncomingConnectorFactory` from the MicroProfile Reactive Messaging specification. + However, the `PublisherBuilder` type can be more challenging to work with and + Smallrye Reactive Messaging converts the provided stream to Mutiny types to do the wiring anyway. + +The returned `Flow.Publisher` would allow controlling the flow of ingestion using backpressure. +It would be preferable to use pull-based APIs of the underlying messaging library to receive messages from the message broker. +You can refer to the [`Mutiny` "How to use polling?" guide](https://smallrye.io/smallrye-mutiny/latest/guides/polling/) +to construct a `Multi` using Mutiny APIs, or implement the `Flow.Subscription` from scratch and wrap it in an `AbstractMulti`. + +Here is an example channel implementation which constructs the reactive stream using the polling API: +``` java +{{ insert('connectors/MyIncomingChannel.java') }} +``` + +### Connector Threading and Vert.x + +Whether the external API call is blocking or non-blocking, managing the thread on which the message processing will be dispatched can be challenging. +Smallrye Reactive Messaging depends on [Eclipse Vert.x](https://vertx.io/) to consistently dispatch messages in event-loop or worker threads, +propagating the message context along different processing stages. +You can read more on [Message Context](message-context.md) and [Vert.x Context](https://vertx.io/blog/an-introduction-to-the-vert-x-context-object/). + +Some connectors already use Vert.x clients, such as RabbitMQ, AMQP 1.0 or MQTT. +Other connectors such as Kafka or Pulsar directly use the client library of the messaging technology, +therefore they create a Vert.x Context per channel to dispatch messages on that context. +Connectors can access the `Vertx` instance by injecting the `ExecutionHolder` bean. +[Mutiny operators `runSubscribtionOn` and `emitOn`](https://smallrye.io/smallrye-mutiny/latest/guides/emit-on-vs-run-subscription-on/) can be used to switch threads the events are dispatched on. + +### Custom Message and Metadata implementations + +Reactive messaging {{ javadoc('org.eclipse.microprofile.reactive.messaging.Message', False, 'io.smallrye.reactive/smallrye-reactive-messaging-api') }} +type is a thin wrapper around a payload and some metadata, which lets implementing acknowledgment and negative acknowledgment of that message. + +Messages received from the underlying library very often return with a custom type, +wrapping the payload and some properties such as key, timestamp, schema information, or any other metadata. + +!!! tip + The client may propose different strategies for consuming messages individually or in batches. + If a batch consuming is available the incoming channel may receive wrap and dispatch message as a batch or individually. + +While it is possible to use `Message.of` builder methods to wrap the incoming message type, +a custom type implementing `Message` interface helps to deal with different aspects we'll cover later, +such as deserialization, message acknowledgment or tracing. + +An example message implementation would be like the following: +``` java +{{ insert('connectors/MyMessage.java') }} +``` + +Note that `MyMessage` implements the `ContextAwareMessage`. +In the constructor `captureContextMetadata` helper method is used to capture the Vert.x context which created the object and capturing it into the `LocalContextMetadata`. +This metadata allows running [each message in its own Vert.x context, supporting context propagation](message-context.md). + +The `MyMessage` type implements the accessors for the `metadata` and `payload` from the `Message` interface. +If the messaging technology doesn't have a built-in unmarshalling mechanism, the message can deserialize the raw payload to a primitive or a complex object. + +!!! warning + The custom message implementation is usually not the type consumed by the application injecting channels. + Applications usually inject in the payload, the raw consumed type (in the above example the `ConsumedMessage`), + or some other type provided by the [`MessageConverter`s](converters.md). + Handling of `Message` types by the application is restricted only to advanced use cases, because handling of message acknowledgment is manual + Even then the message may be intercepted before and changed, conserving the `metadata`, `ack` and `nack` handlers but not the original type created by the connector. + +The `MyIncomingMetadata` gives access to the underlying consumed message attributes, and applications can inject this object for accessing message details: +``` java +{{ insert('connectors/MyIncomingMetadata.java') }} +``` + +Also note that `ack` and `nack` method implementations are delegated to handler objects. +This allows configuring different strategies at channel level. + +### Acknowledgment strategies + +The [acknowledgement](acknowledgement.md) is the way for message consumers to inform the broker that the message has been successfully received and processed. +Depending on the messaging technology the broker then can decide to remove the message from the queue, flag as consumed or purge it completely. +In Reactive Messaging there are different policies to trigger the acknowledgement but the canonical one is to acknowledge a message when the processing (potentially asynchronous) has completed (`POST_PROCESSING`). + +The Reactive Messaging defines `Message#ack` method as non-blocking asynchronous, returning a `CompletionStage`, +because potentially the acknowledgement action sends a network call to the broker. + +The following example simply calls the client `ack` method using the Mutiny `Uni` and switch the emitter to the Message context. +Returning back to the message context is essential for chaining asynchronous actions without losing the context and for keeping the consistency on message consumption flow. + +``` java +{{ insert('connectors/MyAckHandler.java') }} +``` + +While this ack handler strategy acknowledges each message to the broker, +the messaging technology can allow employing different strategies for acknowledging messages. +For example an ack strategy can track processed messages and acknowledge them altogether or call a different client side endpoint to acknowledge the message batch. + +### Failure handling strategies + +The failure handling, or the [negative acknowledgment](acknowledgement.md#negative-acknowledgement) allows indicating that a message was not processed correctly. +Similar to the acknowledgment the Reactive Messaging defines `Message#nack(Throwable reason, Metadata metadata)` method as non-blocking asynchronous, returning a `CompletionStage`. + +``` java +{{ insert('connectors/MyFailureHandler.java') }} +``` + +Different failure handling strategies can, for example, +- Ignore the failure, log and call the `ack` instead +- Send the message to a dead letter queue and call the `ack` +- Employ a different strategy depending on the `Metadata` associated with the `nack` method call. + +### Message Converters + +The connector can propose default [`MessageConverter`](converters.md) implementations for converting the payload to a custom type. +As an example the following converter extracts the `CustomMessage` and puts it in the payload: +``` java +{{ insert('connectors/MyMessageConverter.java') }} +``` + +## Implementing Outbound Channels + +The `OutboundConnector` implementation returns, for a given channel configuration, a `Flow.Subscriber` of messages. +This is typically implemented by a custom `Flow.Processor` and using the `MultiUtils.via` helper methods to apply message transformations. + +!!! note "`OutgoingConnectorFactory`" + The outbound channels can also implement the `OutgoingConnectorFactory` from the MicroProfile Reactive Messaging specification. + However, it is usually more friendly to work with the `MultiUtils.via` methods to construct and transform outgoing messages. + +Here is an example outgoing channel implementation: +``` java +{{ insert('connectors/MyOutgoingChannel.java') }} +``` +The `MultiUtils.via` helper method allows using the `Multi` chaining methods and in the same time provides a `Flow.Subscriber`. +However, this implementation allows sending messages one at a time: +one only after the previous outgoing message send is completed. + +Some messaging technologies provide publish receipt, +a message back from the broker to the sender that asynchronously acknowledges the sent operation. +In this case the connector can only be sure of the send operation when it receives the publish receipt of that message. +Some technologies may provide blocking sending calls, +in that case the connector needs to delegate the sending call to a worker thread. + +Depending on whether the client supports multiple in-flight outgoing messages, you can also use a `SenderProcessor`, +which allows receiving configuration for the maximum number of in-flight messages and whether it waits for completion (publish receipt from the broker): + +``` java +{{ insert('connectors/MyOutgoingChannelWithPartials.java', 'sender-processor') }} +``` + +Other more advanced scenarios can be implemented to retry the transmission in case of a retryable failure, +or batch multiple outgoing messages to a single send operation. + +### Outgoing `Message` Builder + +In order to convey all attributes of the outgoing message to the client library +connectors provide outgoing `Message` implementation and a corresponding outgoing message metadata. +These allow the application developer to build the message attributes that will be sent to the broker. + +``` java +{{ insert('connectors/MyOutgoingMessage.java') }} +``` + +``` java +{{ insert('connectors/MyOutgoingMetadata.java') }} +``` + +The outgoing channel implementation then will construct the client library object that represents the outbound message, `SendMessage` in this example: + +``` java +{{ insert('connectors/MyOutgoingChannelWithPartials.java', 'send-message') }} +``` + +It is a best practice to also allow the application to return a payload of the client outbound library object (`SendMessage`). + +### Outgoing message acknowledgement + +Because the Reactive Messaging [chains acknowledgements](acknowledgement.md#chain-of-acknowledgment) from incoming message until the outgoing message, +it is crucial for the outgoing channel to correctly ack and nack the message. + +## Smallrye Health Integration + +Smallrye Reactive Messaging allows connectors to integrate with Smallrye Health to contribute channel state to the health reports. +Connectors need to implement the `HealthReporter` interface and implement some or all of the `getReadiness`, `getLiveness` and `getStartup` methods: + +``` java +{{ insert('connectors/MyConnectorWithPartials.java', 'health-report') }} +``` + +Implementing health reports per channel depends on what information is available to the connector. +For more information on different health check probes you can check out [Configure Liveness, Readiness and Startup Probes +](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/) + +You may want to add a connector attribute to enable/disable the health reporting per channel: +```java +import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; + +@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true") +``` + +## OpenTelemetry Tracing Integration + +Smallrye Reactive Messaging allows connectors to easily integrate with OpenTelemetry tracing. +It propagates the tracing context from inbound messages and to outbound messages. +The `smallrye-reactive-messaging-otel` module provides necessary dependencies to the OpenTelemetry artifacts and also provides `TracingUtils` helper class for setting up the tracing. + +```xml + + io.smallrye.reactive.messaging + smallrye-reactive-messaging-otel + +``` + +For integrating tracing you'd need to create a couple of classes: + +- Holder class for tracing information +- Implementation of `io.opentelemetry.context.propagation.TextMapGetter`, which retrieves for a given key the value of a tracing attributed from the holder object +- Implementation of `io.opentelemetry.context.propagation.TextMapSetter`, which sets on the holder object the key and value of tracing attributes +- Implementations of `io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor` and `io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter` + +Then you'd need to configure instrumenters per incoming and outgoing channel: + +``` java +{{ insert('connectors/tracing/MyOpenTelemetryInstrumenter.java', 'create-instrumenter') }} +``` + +Finally, you'd need to configure instrumenters per incoming and outgoing channels and wire the call to instrumenter using `TracingUtils`. + +For an incoming channel, you'd need to call the instrumenter on an inbound message: + +``` java +{{ insert('connectors/MyIncomingChannelWithPartials.java', 'incoming-tracing') }} +``` + +For an outgoing channel, you'd need to call the instrumenter on constructing the outbound message: + +``` java +{{ insert('connectors/MyOutgoingChannelWithPartials.java', 'outgoing-tracing') }} +``` + +You may want to add a connector attribute to enable/disable the tracing per channel: +```java +import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; + +@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true") +``` + +## Testing the connector + +While unit tests are highly encouraged for validating ad-hoc logic in connector code, +by nature connector tests are mostly integration tests validating the correct configuration and functioning of channels. +Most of the time tests need to run against a broker instance. +This instance can be mocked or embedded in the test JVM, +or provisioned in a container runtime using [Testcontainers](https://testcontainers.org). +The testcontainers approach is encouraged as it'll provide a testing environment closest to reality. + +It may take too much time and resources to start a broker per test method or per test class, +so may want to share the same broker instance between all test classes. +In that case you can checkout how to write a [JUnit 5 Extension](https://junit.org/junit5/docs/current/user-guide/#extensions) +and start only one container instance in the beginning of tests and stop it at the end of all the tests. + +There are essentially two ways of creating the connector behavior to test against: + +1. Instantiating channels directly by passing the custom configuration. +With this you can get the Reactive stream directly from the channel implementation and send/receive messages. +You can use [AssertSubscriber from Mutiny](https://smallrye.io/smallrye-mutiny/2.5.1/guides/testing/) to regulate demand and write assertions. +2. CDI-based tests which write configuration and instantiate application beans. +You can use [Weld, the reference implementation of CDI specification](https://weld.cdi-spec.org/) with configured set of beans and extensions: +``` java +{{ insert('connectors/test/WeldTestBase.java') }} +``` + +You would need following test dependencies for enabling Weld in tests: +```xml + + io.smallrye.reactive + test-common + test + + + org.jboss.weld.se + weld-se-shaded + ${weld.version} + test + + + org.jboss.weld + weld-core-impl + ${weld.version} + test + + + org.awaitility + awaitility + test + +``` + +Your test classes can therefore extend the `WeldTestBase` and provide configuration and application beans: + +``` java +{{ insert('connectors/test/MyConnectorTest.java') }} +``` + +!!! tip "Awaitility" + Because connector tests are usually asynchronous, [awaitility](https://github.com/awaitility/awaitility) + provides a DSL to await on expressed assertions. + +### Common tests for validating the connector + +- Message consumption through incoming channels +- Message producing through outgoing channels +- Ack and failure handler strategies test +- Message Context propagation test `LocalPropagationTest` +- `HealthCheckTest` +- `MessageConverterTest` +- `TracingPropagationTest` +- Configuration test +- Authentication test +- Tests for diff --git a/documentation/src/main/java/connectors/MyAckHandler.java b/documentation/src/main/java/connectors/MyAckHandler.java new file mode 100644 index 0000000000..dc1d1d146e --- /dev/null +++ b/documentation/src/main/java/connectors/MyAckHandler.java @@ -0,0 +1,25 @@ +package connectors; + +import java.util.concurrent.CompletionStage; + +import connectors.api.BrokerClient; +import io.smallrye.mutiny.Uni; + +public class MyAckHandler { + + private final BrokerClient client; + + static MyAckHandler create(BrokerClient client) { + return new MyAckHandler(client); + } + + public MyAckHandler(BrokerClient client) { + this.client = client; + } + + public CompletionStage handle(MyMessage msg) { + return Uni.createFrom().completionStage(client.ack(msg.getConsumedMessage())) + .emitOn(msg::runOnMessageContext) + .subscribeAsCompletionStage(); + } +} diff --git a/documentation/src/main/java/connectors/MyConnector.java b/documentation/src/main/java/connectors/MyConnector.java new file mode 100644 index 0000000000..9dd75beb8a --- /dev/null +++ b/documentation/src/main/java/connectors/MyConnector.java @@ -0,0 +1,75 @@ +package connectors; + +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING; +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING; +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.spi.Connector; + +import connectors.api.BrokerClient; +import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; +import io.smallrye.reactive.messaging.connector.InboundConnector; +import io.smallrye.reactive.messaging.connector.OutboundConnector; +import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; +import io.vertx.mutiny.core.Vertx; + +@ApplicationScoped +@Connector(MyConnector.CONNECTOR_NAME) +@ConnectorAttribute(name = "client-id", type = "string", direction = INCOMING_AND_OUTGOING, description = "The client id ", mandatory = true) +@ConnectorAttribute(name = "buffer-size", type = "int", direction = INCOMING, description = "The size buffer of incoming messages waiting to be processed", defaultValue = "128") +@ConnectorAttribute(name = "topic", type = "string", direction = OUTGOING, description = "The default topic to send the messages, defaults to channel name if not set") +@ConnectorAttribute(name = "maxPendingMessages", type = "int", direction = OUTGOING, description = "The maximum size of a queue holding pending messages", defaultValue = "1000") +@ConnectorAttribute(name = "waitForWriteCompletion", type = "boolean", direction = OUTGOING, description = "Whether the outgoing channel waits for the write completion", defaultValue = "true") +public class MyConnector implements InboundConnector, OutboundConnector { + + public static final String CONNECTOR_NAME = "smallrye-my-connector"; + + @Inject + ExecutionHolder executionHolder; + + Vertx vertx; + + List incomingChannels = new CopyOnWriteArrayList<>(); + List outgoingChannels = new CopyOnWriteArrayList<>(); + + @PostConstruct + void init() { + this.vertx = executionHolder.vertx(); + } + + @Override + public Flow.Publisher> getPublisher(Config config) { + MyConnectorIncomingConfiguration ic = new MyConnectorIncomingConfiguration(config); + String channelName = ic.getChannel(); + String clientId = ic.getClientId(); + int bufferSize = ic.getBufferSize(); + // ... + BrokerClient client = BrokerClient.create(clientId); + MyIncomingChannel channel = new MyIncomingChannel(vertx, ic, client); + incomingChannels.add(channel); + return channel.getStream(); + } + + @Override + public Flow.Subscriber> getSubscriber(Config config) { + MyConnectorOutgoingConfiguration oc = new MyConnectorOutgoingConfiguration(config); + String channelName = oc.getChannel(); + String clientId = oc.getClientId(); + int pendingMessages = oc.getMaxPendingMessages(); + // ... + BrokerClient client = BrokerClient.create(clientId); + MyOutgoingChannel channel = new MyOutgoingChannel(vertx, oc, client); + outgoingChannels.add(channel); + return channel.getSubscriber(); + } +} diff --git a/documentation/src/main/java/connectors/MyConnectorWithPartials.java b/documentation/src/main/java/connectors/MyConnectorWithPartials.java new file mode 100644 index 0000000000..aac4028d57 --- /dev/null +++ b/documentation/src/main/java/connectors/MyConnectorWithPartials.java @@ -0,0 +1,106 @@ +package connectors; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.spi.Connector; + +import connectors.api.BrokerClient; +import io.smallrye.reactive.messaging.connector.InboundConnector; +import io.smallrye.reactive.messaging.connector.OutboundConnector; +import io.smallrye.reactive.messaging.health.HealthReport; +import io.smallrye.reactive.messaging.health.HealthReporter; +import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; +import io.vertx.mutiny.core.Vertx; + +// +@ApplicationScoped +@Connector(MyConnectorWithPartials.CONNECTOR_NAME) +public class MyConnectorWithPartials implements InboundConnector, OutboundConnector, HealthReporter { + + public static final String CONNECTOR_NAME = "smallrye-my-connector"; + + List incomingChannels = new CopyOnWriteArrayList<>(); + List outgoingChannels = new CopyOnWriteArrayList<>(); + + @Override + public HealthReport getReadiness() { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); + for (MyIncomingChannel channel : incomingChannels) { + builder.add(channel.getChannel(), true); + } + for (MyOutgoingChannel channel : outgoingChannels) { + builder.add(channel.getChannel(), true); + } + return builder.build(); + } + + @Override + public HealthReport getLiveness() { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); + for (MyIncomingChannel channel : incomingChannels) { + builder.add(channel.getChannel(), true); + } + for (MyOutgoingChannel channel : outgoingChannels) { + builder.add(channel.getChannel(), true); + } + return builder.build(); + } + + @Override + public HealthReport getStartup() { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); + for (MyIncomingChannel channel : incomingChannels) { + builder.add(channel.getChannel(), true); + } + for (MyOutgoingChannel channel : outgoingChannels) { + builder.add(channel.getChannel(), true); + } + return builder.build(); + } + + // + + @Inject + ExecutionHolder executionHolder; + + Vertx vertx; + + @PostConstruct + void init() { + this.vertx = executionHolder.vertx(); + } + + @Override + public Flow.Publisher> getPublisher(Config config) { + MyConnectorIncomingConfiguration ic = new MyConnectorIncomingConfiguration(config); + String channelName = ic.getChannel(); + String clientId = ic.getClientId(); + int bufferSize = ic.getBufferSize(); + // ... + BrokerClient client = BrokerClient.create(clientId); + MyIncomingChannel channel = new MyIncomingChannel(vertx, ic, client); + incomingChannels.add(channel); + return channel.getStream(); + } + + @Override + public Flow.Subscriber> getSubscriber(Config config) { + MyConnectorOutgoingConfiguration oc = new MyConnectorOutgoingConfiguration(config); + String channelName = oc.getChannel(); + String clientId = oc.getClientId(); + int pendingMessages = oc.getMaxPendingMessages(); + // ... + BrokerClient client = BrokerClient.create(clientId); + MyOutgoingChannel channel = new MyOutgoingChannel(vertx, oc, client); + outgoingChannels.add(channel); + return channel.getSubscriber(); + } +} diff --git a/documentation/src/main/java/connectors/MyFailureHandler.java b/documentation/src/main/java/connectors/MyFailureHandler.java new file mode 100644 index 0000000000..f41b78055c --- /dev/null +++ b/documentation/src/main/java/connectors/MyFailureHandler.java @@ -0,0 +1,27 @@ +package connectors; + +import java.util.concurrent.CompletionStage; + +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import connectors.api.BrokerClient; +import io.smallrye.mutiny.Uni; + +public class MyFailureHandler { + + private final BrokerClient client; + + static MyFailureHandler create(BrokerClient client) { + return new MyFailureHandler(client); + } + + public MyFailureHandler(BrokerClient client) { + this.client = client; + } + + public CompletionStage handle(MyMessage msg, Throwable reason, Metadata metadata) { + return Uni.createFrom().completionStage(() -> client.reject(msg.getConsumedMessage(), reason.getMessage())) + .emitOn(msg::runOnMessageContext) + .subscribeAsCompletionStage(); + } +} diff --git a/documentation/src/main/java/connectors/MyIncomingChannel.java b/documentation/src/main/java/connectors/MyIncomingChannel.java new file mode 100644 index 0000000000..deded1eaa8 --- /dev/null +++ b/documentation/src/main/java/connectors/MyIncomingChannel.java @@ -0,0 +1,57 @@ +package connectors; + +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import connectors.api.BrokerClient; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.health.HealthReport; +import io.vertx.core.impl.VertxInternal; +import io.vertx.mutiny.core.Context; +import io.vertx.mutiny.core.Vertx; + +public class MyIncomingChannel { + + private final String channel; + private final BrokerClient client; + private final Context context; + private final MyAckHandler ackHandler; + private final MyFailureHandler failureHandler; + private final AtomicBoolean closed = new AtomicBoolean(false); + private Flow.Publisher> stream; + + public MyIncomingChannel(Vertx vertx, MyConnectorIncomingConfiguration cfg, BrokerClient client) { + // create and configure the client with MyConnectorIncomingConfiguration + this.channel = cfg.getChannel(); + this.client = client; + this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); + this.ackHandler = MyAckHandler.create(this.client); + this.failureHandler = MyFailureHandler.create(this.client); + this.stream = Multi.createBy().repeating() + .uni(() -> Uni.createFrom().completionStage(this.client.poll())) + .until(__ -> closed.get()) + .emitOn(context::runOnContext) + .map(consumed -> new MyMessage<>(consumed, ackHandler, failureHandler)); + } + + public String getChannel() { + return channel; + } + + public Flow.Publisher> getStream() { + return this.stream; + } + + public void close() { + closed.compareAndSet(false, true); + client.close(); + } + + void isReady(HealthReport.HealthReportBuilder healthReportBuilder) { + + } + +} diff --git a/documentation/src/main/java/connectors/MyIncomingChannelWithPartials.java b/documentation/src/main/java/connectors/MyIncomingChannelWithPartials.java new file mode 100644 index 0000000000..5e51b4b108 --- /dev/null +++ b/documentation/src/main/java/connectors/MyIncomingChannelWithPartials.java @@ -0,0 +1,79 @@ +package connectors; + +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import connectors.api.BrokerClient; +import connectors.api.ConsumedMessage; +import connectors.tracing.MyOpenTelemetryInstrumenter; +import connectors.tracing.MyTrace; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.health.HealthReport; +import io.smallrye.reactive.messaging.tracing.TracingUtils; +import io.vertx.core.impl.VertxInternal; +import io.vertx.mutiny.core.Context; +import io.vertx.mutiny.core.Vertx; + +public class MyIncomingChannelWithPartials { + + private final String channel; + private final BrokerClient client; + private final Context context; + private final MyAckHandler ackHandler; + private final MyFailureHandler failureHandler; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final boolean tracingEnabled; + private Flow.Publisher> stream; + + public MyIncomingChannelWithPartials(Vertx vertx, MyConnectorIncomingConfiguration cfg, BrokerClient client) { + // create and configure the client with MyConnectorIncomingConfiguration + this.channel = cfg.getChannel(); + this.client = client; + this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); + this.ackHandler = MyAckHandler.create(this.client); + this.failureHandler = MyFailureHandler.create(this.client); + this.tracingEnabled = true; + // + Multi> receiveMulti = Multi.createBy().repeating() + .uni(() -> Uni.createFrom().completionStage(this.client.poll())) + .until(__ -> closed.get()) + .emitOn(context::runOnContext) + .map(consumed -> new MyMessage<>(consumed, ackHandler, failureHandler)); + + Instrumenter instrumenter = MyOpenTelemetryInstrumenter.createInstrumenter(true); + if (tracingEnabled) { + receiveMulti = receiveMulti.map(message -> { + ConsumedMessage consumedMessage = message.getMetadata(MyIncomingMetadata.class).get().getCustomMessage(); + return TracingUtils.traceIncoming(instrumenter, message, new MyTrace.Builder() + .withClientId(consumedMessage.clientId()) + .withTopic(consumedMessage.topic()) + .withProperties(consumedMessage.properties()) + .build()); + }); + } + // + this.stream = receiveMulti; + } + + public String getChannel() { + return channel; + } + + public Flow.Publisher> getStream() { + return this.stream; + } + + public void close() { + closed.compareAndSet(false, true); + client.close(); + } + + void isReady(HealthReport.HealthReportBuilder healthReportBuilder) { + + } + +} diff --git a/documentation/src/main/java/connectors/MyIncomingMetadata.java b/documentation/src/main/java/connectors/MyIncomingMetadata.java new file mode 100644 index 0000000000..c288085be9 --- /dev/null +++ b/documentation/src/main/java/connectors/MyIncomingMetadata.java @@ -0,0 +1,34 @@ +package connectors; + +import java.util.Map; + +import connectors.api.ConsumedMessage; + +public class MyIncomingMetadata { + + private final ConsumedMessage msg; + + public MyIncomingMetadata(ConsumedMessage msg) { + this.msg = msg; + } + + public ConsumedMessage getCustomMessage() { + return msg; + } + + public T getBody() { + return msg.body(); + } + + public String getKey() { + return msg.key(); + } + + public long getTimestamp() { + return msg.timestamp(); + } + + public Map getProperties() { + return msg.properties(); + } +} diff --git a/documentation/src/main/java/connectors/MyInjectableMessage.java b/documentation/src/main/java/connectors/MyInjectableMessage.java new file mode 100644 index 0000000000..623abb6045 --- /dev/null +++ b/documentation/src/main/java/connectors/MyInjectableMessage.java @@ -0,0 +1,28 @@ +package connectors; + +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import connectors.api.ConsumedMessage; +import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; +import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; + +public class MyInjectableMessage implements ContextAwareMessage, MetadataInjectableMessage { + + private final T payload; + Metadata metadata; + + public MyInjectableMessage(ConsumedMessage message) { + this.payload = message.body(); + this.metadata = ContextAwareMessage.captureContextMetadata(new MyIncomingMetadata<>(message)); + } + + @Override + public T getPayload() { + return payload; + } + + @Override + public void injectMetadata(Object metadataObject) { + this.metadata = this.metadata.with(metadataObject); + } +} diff --git a/documentation/src/main/java/connectors/MyMessage.java b/documentation/src/main/java/connectors/MyMessage.java new file mode 100644 index 0000000000..cd432f0f3b --- /dev/null +++ b/documentation/src/main/java/connectors/MyMessage.java @@ -0,0 +1,51 @@ +package connectors; + +import java.util.concurrent.CompletionStage; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import connectors.api.ConsumedMessage; +import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; + +public class MyMessage implements Message, ContextAwareMessage { + + private final T payload; + private final Metadata metadata; + private final MyAckHandler ackHandler; + + private final MyFailureHandler nackHandler; + private ConsumedMessage consumed; + + public MyMessage(ConsumedMessage message, MyAckHandler ackHandler, MyFailureHandler nackHandler) { + this.consumed = message; + this.payload = message.body(); + this.ackHandler = ackHandler; + this.nackHandler = nackHandler; + this.metadata = ContextAwareMessage.captureContextMetadata(new MyIncomingMetadata<>(message)); + } + + public ConsumedMessage getConsumedMessage() { + return consumed; + } + + @Override + public T getPayload() { + return payload; + } + + @Override + public Metadata getMetadata() { + return metadata; + } + + @Override + public CompletionStage ack() { + return ackHandler.handle(this); + } + + @Override + public CompletionStage nack(Throwable reason, Metadata nackMetadata) { + return nackHandler.handle(this, reason, nackMetadata); + } +} diff --git a/documentation/src/main/java/connectors/MyMessageConverter.java b/documentation/src/main/java/connectors/MyMessageConverter.java new file mode 100644 index 0000000000..6f9f180e99 --- /dev/null +++ b/documentation/src/main/java/connectors/MyMessageConverter.java @@ -0,0 +1,27 @@ +package connectors; + +import java.lang.reflect.Type; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import connectors.api.ConsumedMessage; +import io.smallrye.reactive.messaging.MessageConverter; +import io.smallrye.reactive.messaging.providers.helpers.TypeUtils; + +@ApplicationScoped +public class MyMessageConverter implements MessageConverter { + @Override + public boolean canConvert(Message in, Type target) { + return TypeUtils.isAssignable(target, ConsumedMessage.class) + && in.getMetadata(MyIncomingMetadata.class).isPresent(); + } + + @Override + public Message convert(Message in, Type target) { + return in.withPayload(in.getMetadata(MyIncomingMetadata.class) + .map(MyIncomingMetadata::getCustomMessage) + .orElse(null)); + } +} diff --git a/documentation/src/main/java/connectors/MyOutgoingChannel.java b/documentation/src/main/java/connectors/MyOutgoingChannel.java new file mode 100644 index 0000000000..013defda10 --- /dev/null +++ b/documentation/src/main/java/connectors/MyOutgoingChannel.java @@ -0,0 +1,49 @@ +package connectors; + +import java.util.concurrent.Flow; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import connectors.api.BrokerClient; +import connectors.api.SendMessage; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; +import io.vertx.mutiny.core.Vertx; + +public class MyOutgoingChannel { + private final String channel; + private Flow.Subscriber> subscriber; + private final BrokerClient client; + private final String topic; + + public MyOutgoingChannel(Vertx vertx, MyConnectorOutgoingConfiguration oc, BrokerClient client) { + this.channel = oc.getChannel(); + this.client = client; + this.topic = oc.getTopic().orElse(oc.getChannel()); + this.subscriber = MultiUtils.via(multi -> multi.call(m -> publishMessage(this.client, m))); + } + + private Uni publishMessage(BrokerClient client, Message m) { + // construct the outgoing message + SendMessage sendMessage = new SendMessage(); + Object payload = m.getPayload(); + sendMessage.setPayload(payload); + sendMessage.setTopic(topic); + m.getMetadata(MyOutgoingMetadata.class).ifPresent(out -> { + sendMessage.setTopic(out.getTopic()); + sendMessage.setKey(out.getKey()); + //... + }); + return Uni.createFrom().completionStage(() -> client.send(sendMessage)) + .onItem().transformToUni(receipt -> Uni.createFrom().completionStage(m.ack())) + .onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(m.nack(t))); + } + + public Flow.Subscriber> getSubscriber() { + return this.subscriber; + } + + public String getChannel() { + return this.channel; + } +} diff --git a/documentation/src/main/java/connectors/MyOutgoingChannelWithPartials.java b/documentation/src/main/java/connectors/MyOutgoingChannelWithPartials.java new file mode 100644 index 0000000000..b153304ce7 --- /dev/null +++ b/documentation/src/main/java/connectors/MyOutgoingChannelWithPartials.java @@ -0,0 +1,105 @@ +package connectors; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Flow; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import connectors.api.BrokerClient; +import connectors.api.SendMessage; +import connectors.tracing.MyOpenTelemetryInstrumenter; +import connectors.tracing.MyTrace; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; +import io.smallrye.reactive.messaging.providers.helpers.SenderProcessor; +import io.smallrye.reactive.messaging.tracing.TracingUtils; +import io.vertx.mutiny.core.Vertx; + +public class MyOutgoingChannelWithPartials { + private SenderProcessor processor; + private Flow.Subscriber> subscriber; + private final BrokerClient client; + private final String topic; + private final boolean tracingEnabled; + + private Instrumenter instrumenter; + + public MyOutgoingChannelWithPartials(Vertx vertx, MyConnectorOutgoingConfiguration oc, BrokerClient client) { + this.client = client; + this.topic = oc.getTopic().orElse(oc.getChannel()); + this.tracingEnabled = true; + // + long requests = oc.getMaxPendingMessages(); + boolean waitForWriteCompletion = oc.getWaitForWriteCompletion(); + if (requests <= 0) { + requests = Long.MAX_VALUE; + } + this.processor = new SenderProcessor(requests, waitForWriteCompletion, m -> publishMessage(client, m)); + this.subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(f -> { + // log the failure + })); + // + instrumenter = MyOpenTelemetryInstrumenter.createInstrumenter(false); + + } + + // + private Uni publishMessage(BrokerClient client, Message message) { + // construct the outgoing message + SendMessage sendMessage; + Object payload = message.getPayload(); + if (payload instanceof SendMessage) { + sendMessage = (SendMessage) message.getPayload(); + } else { + sendMessage = new SendMessage(); + sendMessage.setPayload(payload); + sendMessage.setTopic(topic); + message.getMetadata(MyOutgoingMetadata.class).ifPresent(out -> { + sendMessage.setTopic(out.getTopic()); + sendMessage.setKey(out.getKey()); + //... + }); + } + return Uni.createFrom().completionStage(() -> client.send(sendMessage)) + .onItem().transformToUni(receipt -> Uni.createFrom().completionStage(message.ack())) + .onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(message.nack(t))); + } + // + + // + private Uni publishMessageWithTracing(BrokerClient client, Message message) { + // construct the outgoing message + SendMessage sendMessage; + Object payload = message.getPayload(); + if (payload instanceof SendMessage) { + sendMessage = (SendMessage) message.getPayload(); + } else { + sendMessage = new SendMessage(); + sendMessage.setPayload(payload); + sendMessage.setTopic(topic); + message.getMetadata(MyOutgoingMetadata.class).ifPresent(out -> { + sendMessage.setTopic(out.getTopic()); + sendMessage.setKey(out.getKey()); + //... + }); + } + if (tracingEnabled) { + Map properties = new HashMap<>(); + TracingUtils.traceOutgoing(instrumenter, message, new MyTrace.Builder() + .withProperties(properties) + .withTopic(sendMessage.getTopic()) + .build()); + sendMessage.setProperties(properties); + } + return Uni.createFrom().completionStage(() -> client.send(sendMessage)) + .onItem().transformToUni(receipt -> Uni.createFrom().completionStage(message.ack())) + .onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(message.nack(t))); + } + // + + public Flow.Subscriber> getSubscriber() { + return this.subscriber; + } +} diff --git a/documentation/src/main/java/connectors/MyOutgoingMessage.java b/documentation/src/main/java/connectors/MyOutgoingMessage.java new file mode 100644 index 0000000000..86c8a3636e --- /dev/null +++ b/documentation/src/main/java/connectors/MyOutgoingMessage.java @@ -0,0 +1,70 @@ +package connectors; + +import java.util.concurrent.CompletionStage; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; + +public class MyOutgoingMessage implements Message, ContextAwareMessage { + + private final T payload; + private final Metadata metadata; + + private final Supplier> ack; + private final Function> nack; + + public static MyOutgoingMessage from(Message message) { + return new MyOutgoingMessage<>(message.getPayload(), message.getMetadata(), message.getAck(), message.getNack()); + } + + public MyOutgoingMessage(T payload, Metadata metadata, + Supplier> ack, + Function> nack) { + this.payload = payload; + this.metadata = metadata; + this.ack = ack; + this.nack = nack; + } + + public MyOutgoingMessage(T payload, String key, String topic, + Supplier> ack, + Function> nack) { + this(payload, Metadata.of(new MyOutgoingMetadata(topic, key)), ack, nack); + } + + @Override + public T getPayload() { + return payload; + } + + @Override + public Metadata getMetadata() { + return metadata; + } + + @Override + public Supplier> getAck() { + return this.ack; + } + + @Override + public Function> getNack() { + return this.nack; + } + + public MyOutgoingMessage withKey(String key) { + this.metadata.with(this.metadata.get(MyOutgoingMetadata.class) + .map(m -> MyOutgoingMetadata.builder(m).withKey(key).build())); + return this; + } + + public MyOutgoingMessage withTopic(String topic) { + this.metadata.with(this.metadata.get(MyOutgoingMetadata.class) + .map(m -> MyOutgoingMetadata.builder(m).withTopic(topic).build())); + return this; + } +} diff --git a/documentation/src/main/java/connectors/MyOutgoingMetadata.java b/documentation/src/main/java/connectors/MyOutgoingMetadata.java new file mode 100644 index 0000000000..15c47cc1b8 --- /dev/null +++ b/documentation/src/main/java/connectors/MyOutgoingMetadata.java @@ -0,0 +1,56 @@ +package connectors; + +public class MyOutgoingMetadata { + private String topic; + private String key; + + public static MyOutgoingMetadataBuilder builder() { + return new MyOutgoingMetadataBuilder(); + } + + public static MyOutgoingMetadataBuilder builder(MyOutgoingMetadata metadata) { + return new MyOutgoingMetadataBuilder(metadata); + } + + public MyOutgoingMetadata(String topic, String key) { + this.topic = topic; + this.key = key; + } + + public String getTopic() { + return topic; + } + + public String getKey() { + return key; + } + + public static class MyOutgoingMetadataBuilder { + private String topic; + private String key; + + public MyOutgoingMetadataBuilder() { + + } + + public MyOutgoingMetadataBuilder(MyOutgoingMetadata metadata) { + this.key = metadata.getKey(); + this.topic = metadata.getTopic(); + } + + public MyOutgoingMetadataBuilder withTopic(String topic) { + this.topic = topic; + return this; + } + + public MyOutgoingMetadataBuilder withKey(String key) { + this.key = key; + return this; + } + + public MyOutgoingMetadata build() { + return new MyOutgoingMetadata(topic, key); + } + } + +} diff --git a/documentation/src/main/java/connectors/api/BrokerClient.java b/documentation/src/main/java/connectors/api/BrokerClient.java new file mode 100644 index 0000000000..af30be8f5c --- /dev/null +++ b/documentation/src/main/java/connectors/api/BrokerClient.java @@ -0,0 +1,21 @@ +package connectors.api; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +public interface BrokerClient { + + static BrokerClient create(String clientId) { + return null; + } + + CompletionStage> poll(); + + CompletionStage send(SendMessage sendMessage); + + CompletableFuture ack(ConsumedMessage msg); + + CompletableFuture reject(ConsumedMessage msg, String reason); + + void close(); +} diff --git a/documentation/src/main/java/connectors/api/ConsumedMessage.java b/documentation/src/main/java/connectors/api/ConsumedMessage.java new file mode 100644 index 0000000000..92edc0669f --- /dev/null +++ b/documentation/src/main/java/connectors/api/ConsumedMessage.java @@ -0,0 +1,23 @@ +package connectors.api; + +import java.util.Map; + +/** + * Message object received from the underlying library + * + * @param payload type + */ +public interface ConsumedMessage { + + String key(); + + long timestamp(); + + T body(); + + Map properties(); + + String topic(); + + String clientId(); +} diff --git a/documentation/src/main/java/connectors/api/PublishAck.java b/documentation/src/main/java/connectors/api/PublishAck.java new file mode 100644 index 0000000000..e93116077f --- /dev/null +++ b/documentation/src/main/java/connectors/api/PublishAck.java @@ -0,0 +1,4 @@ +package connectors.api; + +public interface PublishAck { +} diff --git a/documentation/src/main/java/connectors/api/SendMessage.java b/documentation/src/main/java/connectors/api/SendMessage.java new file mode 100644 index 0000000000..306ca176a5 --- /dev/null +++ b/documentation/src/main/java/connectors/api/SendMessage.java @@ -0,0 +1,46 @@ +package connectors.api; + +import java.util.Map; + +public class SendMessage { + + Object payload; + + String key; + + String topic; + + Map properties; + + public Object getPayload() { + return payload; + } + + public void setPayload(Object payload) { + this.payload = payload; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } +} diff --git a/documentation/src/main/java/connectors/test/MyConnectorTest.java b/documentation/src/main/java/connectors/test/MyConnectorTest.java new file mode 100644 index 0000000000..718aac41f2 --- /dev/null +++ b/documentation/src/main/java/connectors/test/MyConnectorTest.java @@ -0,0 +1,52 @@ +package connectors.test; + +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.junit.jupiter.api.Test; + +import connectors.MyConnector; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class MyConnectorTest extends WeldTestBase { + + @Test + void incomingChannel() { + String host = ""; + int port = 0; + String myTopic = UUID.randomUUID().toString(); + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.data.topic", myTopic) + .with("mp.messaging.incoming.data.host", host) + .with("mp.messaging.incoming.data.port", port) + .with("mp.messaging.incoming.data.connector", MyConnector.CONNECTOR_NAME); + MyApp app = runApplication(config, MyApp.class); + + int expected = 10; + // produce expected number of messages to myTopic + + // wait until app received + await().until(() -> app.received().size() == expected); + } + + @ApplicationScoped + public static class MyApp { + + List received = new CopyOnWriteArrayList<>(); + + @Incoming("data") + void consume(String msg) { + received.add(msg); + } + + public List received() { + return received; + } + } +} diff --git a/documentation/src/main/java/connectors/test/WeldTestBase.java b/documentation/src/main/java/connectors/test/WeldTestBase.java new file mode 100644 index 0000000000..e169eeee9c --- /dev/null +++ b/documentation/src/main/java/connectors/test/WeldTestBase.java @@ -0,0 +1,140 @@ +package connectors.test; + +import jakarta.enterprise.inject.spi.BeanManager; + +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral; +import org.jboss.weld.environment.se.Weld; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +import connectors.MyConnector; +import connectors.MyMessageConverter; +import io.smallrye.config.SmallRyeConfigProviderResolver; +import io.smallrye.config.inject.ConfigExtension; +import io.smallrye.reactive.messaging.providers.MediatorFactory; +import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; +import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry; +import io.smallrye.reactive.messaging.providers.extension.ChannelProducer; +import io.smallrye.reactive.messaging.providers.extension.EmitterFactoryImpl; +import io.smallrye.reactive.messaging.providers.extension.HealthCenter; +import io.smallrye.reactive.messaging.providers.extension.LegacyEmitterFactoryImpl; +import io.smallrye.reactive.messaging.providers.extension.MediatorManager; +import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl; +import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension; +import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory; +import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories; +import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry; +import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator; +import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator; +import io.smallrye.reactive.messaging.providers.wiring.Wiring; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class WeldTestBase { + + protected Weld weld; + protected WeldContainer container; + + @BeforeEach + public void initWeld() { + weld = new Weld(); + + // SmallRye config + ConfigExtension extension = new ConfigExtension(); + weld.addExtension(extension); + + weld.addBeanClass(MediatorFactory.class); + weld.addBeanClass(MediatorManager.class); + weld.addBeanClass(InternalChannelRegistry.class); + weld.addBeanClass(ConnectorFactories.class); + weld.addBeanClass(ConfiguredChannelFactory.class); + weld.addBeanClass(ChannelProducer.class); + weld.addBeanClass(ExecutionHolder.class); + weld.addBeanClass(WorkerPoolRegistry.class); + weld.addBeanClass(HealthCenter.class); + weld.addBeanClass(Wiring.class); + weld.addExtension(new ReactiveMessagingExtension()); + + weld.addBeanClass(EmitterFactoryImpl.class); + weld.addBeanClass(MutinyEmitterFactoryImpl.class); + weld.addBeanClass(LegacyEmitterFactoryImpl.class); + + weld.addBeanClass(MyConnector.class); + weld.addBeanClass(MyMessageConverter.class); + weld.addBeanClass(MetricDecorator.class); + weld.addBeanClass(MicrometerDecorator.class); + weld.disableDiscovery(); + } + + @AfterEach + public void stopContainer() { + if (container != null) { + // TODO Explicitly close the connector + getBeanManager().createInstance() + .select(MyConnector.class, ConnectorLiteral.of(MyConnector.CONNECTOR_NAME)).get(); + container.close(); + } + // Release the config objects + SmallRyeConfigProviderResolver.instance().releaseConfig(ConfigProvider.getConfig()); + } + + public BeanManager getBeanManager() { + if (container == null) { + runApplication(new MapBasedConfig()); + } + return container.getBeanManager(); + } + + public void addBeans(Class... clazzes) { + weld.addBeanClasses(clazzes); + } + + public T get(Class clazz) { + return getBeanManager().createInstance().select(clazz).get(); + } + + public T runApplication(MapBasedConfig config, Class clazz) { + weld.addBeanClass(clazz); + runApplication(config); + return get(clazz); + } + + public void runApplication(MapBasedConfig config) { + if (config != null) { + config.write(); + } else { + MapBasedConfig.cleanup(); + } + + container = weld.initialize(); + } + + public static void addConfig(MapBasedConfig config) { + if (config != null) { + config.write(); + } else { + MapBasedConfig.cleanup(); + } + } + + public HealthCenter getHealth() { + if (container == null) { + throw new IllegalStateException("Application not started"); + } + return container.getBeanManager().createInstance().select(HealthCenter.class).get(); + } + + public boolean isStarted() { + return getHealth().getStartup().isOk(); + } + + public boolean isReady() { + return getHealth().getReadiness().isOk(); + } + + public boolean isAlive() { + return getHealth().getLiveness().isOk(); + } + +} diff --git a/documentation/src/main/java/connectors/tracing/MyAttributesExtractor.java b/documentation/src/main/java/connectors/tracing/MyAttributesExtractor.java new file mode 100644 index 0000000000..c9d76a1cea --- /dev/null +++ b/documentation/src/main/java/connectors/tracing/MyAttributesExtractor.java @@ -0,0 +1,70 @@ +package connectors.tracing; + +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; + +public class MyAttributesExtractor implements AttributesExtractor { + private final MessagingAttributesGetter messagingAttributesGetter; + + public MyAttributesExtractor() { + this.messagingAttributesGetter = new MyMessagingAttributesGetter(); + } + + @Override + public void onStart(final AttributesBuilder attributes, final Context parentContext, final MyTrace myTrace) { + // fill in attributes from myTrace object + } + + @Override + public void onEnd( + final AttributesBuilder attributes, + final Context context, + final MyTrace myTrace, + final Void unused, + final Throwable error) { + + } + + public MessagingAttributesGetter getMessagingAttributesGetter() { + return messagingAttributesGetter; + } + + private static final class MyMessagingAttributesGetter implements MessagingAttributesGetter { + @Override + public String getSystem(final MyTrace myTrace) { + return "my"; + } + + @Override + public String getDestination(final MyTrace myTrace) { + return myTrace.getTopic(); + } + + @Override + public boolean isTemporaryDestination(final MyTrace myTrace) { + return false; + } + + @Override + public String getConversationId(final MyTrace myTrace) { + return null; + } + + @Override + public Long getMessagePayloadSize(final MyTrace myTrace) { + return null; + } + + @Override + public Long getMessagePayloadCompressedSize(final MyTrace myTrace) { + return null; + } + + @Override + public String getMessageId(final MyTrace myTrace, final Void unused) { + return null; + } + } +} diff --git a/documentation/src/main/java/connectors/tracing/MyOpenTelemetryInstrumenter.java b/documentation/src/main/java/connectors/tracing/MyOpenTelemetryInstrumenter.java new file mode 100644 index 0000000000..99d320ee68 --- /dev/null +++ b/documentation/src/main/java/connectors/tracing/MyOpenTelemetryInstrumenter.java @@ -0,0 +1,64 @@ +package connectors.tracing; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.smallrye.reactive.messaging.tracing.TracingUtils; + +/** + * Encapsulates the OpenTelemetry instrumentation API so that those classes are only needed if + * users explicitly enable tracing. + */ +public class MyOpenTelemetryInstrumenter { + + private final Instrumenter instrumenter; + + private MyOpenTelemetryInstrumenter(Instrumenter instrumenter) { + this.instrumenter = instrumenter; + } + + public static MyOpenTelemetryInstrumenter createIncoming() { + return new MyOpenTelemetryInstrumenter(createInstrumenter(true)); + } + + public static MyOpenTelemetryInstrumenter createOutgoing() { + return new MyOpenTelemetryInstrumenter(createInstrumenter(false)); + } + + // + public static Instrumenter createInstrumenter(boolean incoming) { + MessageOperation messageOperation = incoming ? MessageOperation.RECEIVE : MessageOperation.PUBLISH; + + MyAttributesExtractor myExtractor = new MyAttributesExtractor(); + MessagingAttributesGetter attributesGetter = myExtractor.getMessagingAttributesGetter(); + var spanNameExtractor = MessagingSpanNameExtractor.create(attributesGetter, messageOperation); + InstrumenterBuilder builder = Instrumenter.builder(GlobalOpenTelemetry.get(), + "io.smallrye.reactive.messaging", spanNameExtractor); + var attributesExtractor = MessagingAttributesExtractor.create(attributesGetter, messageOperation); + + builder + .addAttributesExtractor(attributesExtractor) + .addAttributesExtractor(myExtractor); + + if (incoming) { + return builder.buildConsumerInstrumenter(MyTraceTextMapGetter.INSTANCE); + } else { + return builder.buildProducerInstrumenter(MyTraceTextMapSetter.INSTANCE); + } + } + // + + public Message traceIncoming(Message message, MyTrace myTrace, boolean makeCurrent) { + return TracingUtils.traceIncoming(instrumenter, message, myTrace, makeCurrent); + } + + public void traceOutgoing(Message message, MyTrace myTrace) { + TracingUtils.traceOutgoing(instrumenter, message, myTrace); + } +} diff --git a/documentation/src/main/java/connectors/tracing/MyTrace.java b/documentation/src/main/java/connectors/tracing/MyTrace.java new file mode 100644 index 0000000000..ed6873d5c8 --- /dev/null +++ b/documentation/src/main/java/connectors/tracing/MyTrace.java @@ -0,0 +1,52 @@ +package connectors.tracing; + +import java.util.Map; + +public class MyTrace { + private final String clientId; + private final String topic; + private final Map messageProperties; + + private MyTrace(String clientId, String topic, Map messageProperties) { + this.clientId = clientId; + this.topic = topic; + this.messageProperties = messageProperties; + } + + public String getClientId() { + return clientId; + } + + public String getTopic() { + return topic; + } + + public Map getMessageProperties() { + return messageProperties; + } + + public static class Builder { + private String clientId; + private String topic; + private Map properties; + + public Builder withClientId(String clientId) { + this.clientId = clientId; + return this; + } + + public Builder withTopic(String topic) { + this.topic = topic; + return this; + } + + public Builder withProperties(Map properties) { + this.properties = properties; + return this; + } + + public MyTrace build() { + return new MyTrace(clientId, topic, properties); + } + } +} diff --git a/documentation/src/main/java/connectors/tracing/MyTraceTextMapGetter.java b/documentation/src/main/java/connectors/tracing/MyTraceTextMapGetter.java new file mode 100644 index 0000000000..e35f9298ef --- /dev/null +++ b/documentation/src/main/java/connectors/tracing/MyTraceTextMapGetter.java @@ -0,0 +1,27 @@ +package connectors.tracing; + +import java.util.ArrayList; +import java.util.Map; + +import io.opentelemetry.context.propagation.TextMapGetter; + +public enum MyTraceTextMapGetter implements TextMapGetter { + INSTANCE; + + @Override + public Iterable keys(final MyTrace carrier) { + Map headers = carrier.getMessageProperties(); + return new ArrayList<>(headers.keySet()); + } + + @Override + public String get(final MyTrace carrier, final String key) { + if (carrier != null) { + Map properties = carrier.getMessageProperties(); + if (properties != null) { + return properties.get(key); + } + } + return null; + } +} diff --git a/documentation/src/main/java/connectors/tracing/MyTraceTextMapSetter.java b/documentation/src/main/java/connectors/tracing/MyTraceTextMapSetter.java new file mode 100644 index 0000000000..ff52b45ffd --- /dev/null +++ b/documentation/src/main/java/connectors/tracing/MyTraceTextMapSetter.java @@ -0,0 +1,18 @@ +package connectors.tracing; + +import java.util.Map; + +import io.opentelemetry.context.propagation.TextMapSetter; + +public enum MyTraceTextMapSetter implements TextMapSetter { + INSTANCE; + + @Override + public void set(final MyTrace carrier, final String key, final String value) { + if (carrier != null) { + // fill message properties from key, value attribute + Map properties = carrier.getMessageProperties(); + properties.put(key, value); + } + } +}