From a44d99551e02d1e281f9ba1af0e313cf99ac0bc8 Mon Sep 17 00:00:00 2001
From: holomekc <30546982+holomekc@users.noreply.github.com>
Date: Sun, 1 Oct 2023 00:37:29 +0200
Subject: [PATCH 01/14] Start with AWS SQS implementation. Sending is in theory
implemented. A lot of things are missing though: logging, finalize tracing,
error handling, retry, config. After that polling could be implemented. I am
afraid that the tracing cannot be implemented properly due to reactive, but
we will see. Tests are missing completely.
---
pom.xml | 1 +
smallrye-reactive-messaging-aws/pom.xml | 95 ++++++++++++++
.../smallrye-reactive-messaging-sqs/pom.xml | 29 +++++
.../reactive/messaging/aws/sqs/Clients.java | 5 +
.../messaging/aws/sqs/SqsConnector.java | 119 ++++++++++++++++++
.../messaging/aws/sqs/SqsOutgoingChannel.java | 111 ++++++++++++++++
.../reactive/messaging/aws/sqs/Target.java | 37 ++++++
.../aws/sqs/action/GetQueueUrlAction.java | 25 ++++
.../sqs/action/SendBatchMessageAction.java | 92 ++++++++++++++
.../aws/sqs/action/SendMessageAction.java | 69 ++++++++++
.../messaging/aws/sqs/cache/TargetCache.java | 28 +++++
.../aws/sqs/client/SqsClientFactory.java | 16 +++
.../aws/sqs/client/SqsClientHolder.java | 36 ++++++
.../aws/sqs/config/ConfigResolver.java | 4 +
.../messaging/aws/sqs/i18n/SqsExceptions.java | 27 ++++
.../messaging/aws/sqs/i18n/SqsLogging.java | 18 +++
.../messaging/aws/sqs/message/SqsMessage.java | 27 ++++
.../aws/sqs/message/SqsMessageMetadata.java | 47 +++++++
.../aws/sqs/message/SqsOutgoingMessage.java | 43 +++++++
.../message/SqsOutgoingMessageMetadata.java | 5 +
.../sqs/tracing/SqsAttributesExtractor.java | 24 ++++
.../aws/sqs/tracing/SqsInstrumenter.java | 28 +++++
.../tracing/SqsMessagingAttributesGetter.java | 54 ++++++++
.../messaging/aws/sqs/tracing/SqsTrace.java | 44 +++++++
.../sqs/tracing/SqsTraceTextMapSetter.java | 14 +++
.../messaging/aws/sqs/util/Helper.java | 16 +++
.../messaging/tracing/TracingUtils.java | 2 +
27 files changed, 1016 insertions(+)
create mode 100644 smallrye-reactive-messaging-aws/pom.xml
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/pom.xml
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/Clients.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutgoingChannel.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/Target.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/GetQueueUrlAction.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/SendBatchMessageAction.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/SendMessageAction.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/cache/TargetCache.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/client/SqsClientFactory.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/client/SqsClientHolder.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/config/ConfigResolver.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/i18n/SqsExceptions.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/i18n/SqsLogging.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsMessage.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsMessageMetadata.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsOutgoingMessage.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsOutgoingMessageMetadata.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsAttributesExtractor.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsInstrumenter.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsMessagingAttributesGetter.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTrace.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTraceTextMapSetter.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/util/Helper.java
diff --git a/pom.xml b/pom.xml
index 1356f9504f..ad3cb72d26 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,6 +139,7 @@
smallrye-reactive-messaging-rabbitmq
smallrye-reactive-messaging-gcp-pubsub
smallrye-reactive-messaging-pulsar
+ smallrye-reactive-messaging-aws
examples/quickstart
examples/kafka-quickstart
examples/kafka-quickstart-kotlin
diff --git a/smallrye-reactive-messaging-aws/pom.xml b/smallrye-reactive-messaging-aws/pom.xml
new file mode 100644
index 0000000000..22e69b760f
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/pom.xml
@@ -0,0 +1,95 @@
+
+
+ 4.0.0
+
+
+ io.smallrye.reactive
+ smallrye-reactive-messaging
+ 4.11.0-SNAPSHOT
+
+
+ smallrye-reactive-messaging-aws
+
+ SmallRye Reactive Messaging : Connector :: AWS
+
+ pom
+
+
+ 2.20.157
+
+
+
+ smallrye-reactive-messaging-sqs
+
+
+
+
+
+ software.amazon.awssdk
+ bom
+ ${awssdk.version}
+ pom
+ import
+
+
+
+
+
+
+ ${project.groupId}
+ smallrye-reactive-messaging-provider
+ ${project.version}
+
+
+
+ io.smallrye.config
+ smallrye-config
+ test
+
+
+ io.smallrye.reactive
+ smallrye-connector-attribute-processor
+ ${project.version}
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ ${project.build.directory}/generated-sources/
+
+
+ io.smallrye.reactive.messaging.connector.ConnectorAttributeProcessor
+
+
+ org.jboss.logging.processor.apt.LoggingToolsProcessor
+
+
+
+
+
+
+
+
+
+ coverage
+
+ @{jacocoArgLine}
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+
+
+
+
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/pom.xml b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/pom.xml
new file mode 100644
index 0000000000..3cce937233
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/pom.xml
@@ -0,0 +1,29 @@
+
+
+ 4.0.0
+
+ io.smallrye.reactive
+ smallrye-reactive-messaging-aws
+ 4.11.0-SNAPSHOT
+
+
+ smallrye-reactive-messaging-sqs
+
+ SmallRye Reactive Messaging : Connector :: AWS SQS
+
+
+
+ software.amazon.awssdk
+ sqs
+
+
+ io.smallrye.reactive
+ smallrye-reactive-messaging-otel
+ ${project.version}
+
+
+
+
+
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/Clients.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/Clients.java
new file mode 100644
index 0000000000..f06f438835
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/Clients.java
@@ -0,0 +1,5 @@
+package io.smallrye.reactive.messaging.aws.sqs;
+
+public class Clients {
+
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java
new file mode 100644
index 0000000000..a2bad4c407
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java
@@ -0,0 +1,119 @@
+package io.smallrye.reactive.messaging.aws.sqs;
+
+import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
+import io.smallrye.reactive.messaging.aws.sqs.cache.TargetCache;
+import io.smallrye.reactive.messaging.aws.sqs.client.SqsClientHolder;
+import io.smallrye.reactive.messaging.connector.InboundConnector;
+import io.smallrye.reactive.messaging.connector.OutboundConnector;
+import io.smallrye.reactive.messaging.health.HealthReporter;
+import io.smallrye.reactive.messaging.json.JsonMapping;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.Priority;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.BeforeDestroyed;
+import jakarta.enterprise.event.Observes;
+import jakarta.enterprise.event.Reception;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.eclipse.microprofile.reactive.messaging.spi.Connector;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Flow;
+
+import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING;
+import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING;
+import static io.smallrye.reactive.messaging.aws.sqs.client.SqsClientFactory.createSqsClient;
+import static io.smallrye.reactive.messaging.aws.sqs.i18n.SqsExceptions.ex;
+import static io.smallrye.reactive.messaging.aws.sqs.i18n.SqsLogging.log;
+
+@ApplicationScoped
+@Connector(SqsConnector.CONNECTOR_NAME)
+// common
+@ConnectorAttribute(name = "queue", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the SQS queue. If not set, the channel name is used")
+@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")
+@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true")
+
+// outgoing
+@ConnectorAttribute(name = "send.batch.enabled", type = "boolean", direction = OUTGOING, description = "Send messages in batches.", defaultValue = "false")
+
+// incomming
+public class SqsConnector implements InboundConnector, OutboundConnector, HealthReporter {
+
+ static final String CONNECTOR_NAME = "smallrye-aws-sqs";
+
+ private final Map clients = new ConcurrentHashMap<>();
+ private final Map clientsByChannel = new ConcurrentHashMap<>();
+ private final List outgoingChannels = new CopyOnWriteArrayList<>();
+ // private final List> incomingChannels = new CopyOnWriteArrayList<>();
+
+ @Inject
+ Instance jsonMapper;
+ private JsonMapping jsonMapping;
+
+ @PostConstruct
+ public void init() {
+ if (jsonMapper.isUnsatisfied()) {
+ log.warn(
+ "Please add one of the additional mapping modules (-jsonb or -jackson) to be able to (de)serialize JSON messages.");
+ } else if (jsonMapper.isAmbiguous()) {
+ log.warn(
+ "Please select only one of the additional mapping modules (-jsonb or -jackson) to be able to (de)serialize JSON messages.");
+ this.jsonMapping = jsonMapper.stream().findFirst()
+ .orElseThrow(() -> new RuntimeException("Unable to find JSON Mapper"));
+ } else {
+ this.jsonMapping = jsonMapper.get();
+ }
+ }
+
+ @Override
+ public Flow.Publisher extends Message>> getPublisher(Config config) {
+ SqsConnectorIncomingConfiguration ic = new SqsConnectorIncomingConfiguration(config);
+
+ SqsAsyncClient client = clients.computeIfAbsent("", ignored -> createSqsClient(ic));
+ clientsByChannel.put(ic.getChannel(), client);
+
+ return null;
+ }
+
+ @Override
+ public Flow.Subscriber extends Message>> getSubscriber(Config config) {
+ SqsConnectorOutgoingConfiguration oc = new SqsConnectorOutgoingConfiguration(config);
+
+ SqsAsyncClient client = clients.computeIfAbsent("", ignored -> createSqsClient(oc));
+ clientsByChannel.put(oc.getChannel(), client);
+
+ try {
+ SqsOutgoingChannel channel = new SqsOutgoingChannel(new SqsClientHolder<>(client, oc, jsonMapping, new TargetCache()));
+ outgoingChannels.add(channel);
+ return channel.getSubscriber();
+ } catch (SqsException e) {
+ throw ex.illegalStateUnableToBuildConsumer(e);
+ }
+ }
+
+ public void terminate(
+ @Observes(notifyObserver = Reception.IF_EXISTS)
+ @Priority(50)
+ @BeforeDestroyed(ApplicationScoped.class) Object event) {
+ // incomingChannels.forEach(PulsarIncomingChannel::close);
+ outgoingChannels.forEach(SqsOutgoingChannel::close);
+ for (SqsAsyncClient client : clients.values()) {
+ try {
+ client.close();
+ } catch (SqsException e) {
+ log.unableToCloseClient(e);
+ }
+ }
+ // incomingChannels.clear();
+ outgoingChannels.clear();
+ clients.clear();
+ clientsByChannel.clear();
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutgoingChannel.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutgoingChannel.java
new file mode 100644
index 0000000000..cca9ae3487
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutgoingChannel.java
@@ -0,0 +1,111 @@
+package io.smallrye.reactive.messaging.aws.sqs;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.tuples.Tuple2;
+import io.smallrye.reactive.messaging.aws.sqs.action.SendBatchMessageAction;
+import io.smallrye.reactive.messaging.aws.sqs.action.SendMessageAction;
+import io.smallrye.reactive.messaging.aws.sqs.client.SqsClientHolder;
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsMessage;
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsOutgoingMessage;
+import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsTrace;
+import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
+import io.smallrye.reactive.messaging.tracing.TracingUtils;
+import org.eclipse.microprofile.reactive.messaging.Message;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.Flow;
+
+import static io.smallrye.reactive.messaging.aws.sqs.tracing.SqsInstrumenter.SQS_OUTGOING_INSTRUMENTER;
+
+public class SqsOutgoingChannel {
+
+ private final SqsClientHolder clientHolder;
+ private final Flow.Subscriber extends Message>> subscriber;
+
+ public SqsOutgoingChannel(SqsClientHolder clientHolder) {
+ this.clientHolder = clientHolder;
+
+ if (clientHolder.getConfig().getSendBatchEnabled()) {
+ subscriber = MultiUtils.via(this::createStream);
+ } else {
+ subscriber = MultiUtils.via(this::createStreamWithBatching);
+ }
+ }
+
+ private Multi extends SqsOutgoingMessage>> prepare(Multi> m) {
+ return m
+ .onItem().transform(SqsOutgoingMessage::from)
+ .onItem().transformToUniAndConcatenate(this::addTargetInformation)
+ .onItem().invoke(this::tracing);
+ }
+
+ private Multi createStream(Multi> m) {
+ return prepare(m)
+ .onItem().transformToUniAndConcatenate(this::sendMessage);
+ // .onFailure().invoke(log::unableToSend)
+ }
+
+ private Multi createStreamWithBatching(Multi> m) {
+ // We need to group by target first and then into lists for batching.
+ // Reason is that queue can be overwritten in messages. We cannot group into lists directly. Otherwise,
+ // we would send messages to the wrong queue.
+ return prepare(m)
+ .group().by(SqsMessage::getTarget)
+ .onItem().transformToMultiAndMerge(group -> group
+ .group().intoLists().of(10, Duration.ofMillis(0))
+ .onItem().transform(msg -> Tuple2.of(group.key(), msg)))
+ .onItem().transformToUniAndConcatenate(tuple -> sendBatchMessage(tuple.getItem1(), tuple.getItem2()));
+ }
+
+ private Uni extends SqsOutgoingMessage>> addTargetInformation(SqsOutgoingMessage> msg) {
+ return clientHolder.getTargetCache().getTarget(clientHolder, msg)
+ .onItem().transform(target -> {
+ msg.withTarget(target);
+ return msg;
+ });
+ }
+
+ private Uni sendMessage(SqsOutgoingMessage> message) {
+ final SqsOutgoingMessage> sqsMessage = SqsOutgoingMessage.from(message);
+ return clientHolder.getTargetCache().getTarget(clientHolder, sqsMessage)
+ .onItem().transformToUni(target -> SendMessageAction.sendMessage(clientHolder, sqsMessage));
+ }
+
+ private Uni sendBatchMessage(Target target, List extends SqsOutgoingMessage>> messages) {
+ return SendBatchMessageAction.sendMessage(clientHolder, target, messages);
+ }
+
+ private void tracing(SqsOutgoingMessage> message) {
+ if (clientHolder.getConfig().getTracingEnabled()) {
+ SqsTrace trace = new SqsTrace()
+ .withQueue(message.getTarget().getTargetName())
+ // TODO: Cannot set messageId. It is provided in response.
+ // The intstrumenter documentation says:
+ // Call shouldStart(Context, Object) and do not proceed if it returns false.
+ // Call start(Context, Object) at the beginning of a request.
+ // Call end(Context, Object, Object, Throwable) at the end of a request.
+ // TracingUtils violates this, because end is called immediately and not at the end of a request.
+ // So it would be necessary to update the trace at response and then end the span.
+ // This is not possible.
+ // For batching it starts here and might be delayed extremely. This will not be visible.
+ // Furthermore, it is an issue to set the messageIds. They are available in the action, but
+ // we cannot wait until this happened. Updating messageId in response would be ok as well, but not
+ // possible as mentioned. Or it would be necessary to create the ids here. I do not like that.
+ // .withMessageId("")
+ .withConversationId(message.getSqsMetadata().getConversationId())
+ // We do not set payload size. This would require to calculate it, which is less performant.
+ ;
+ TracingUtils.traceOutgoing(SQS_OUTGOING_INSTRUMENTER, message, trace);
+ }
+ }
+
+ public Flow.Subscriber extends Message>> getSubscriber() {
+ return subscriber;
+ }
+
+ public void close() {
+
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/Target.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/Target.java
new file mode 100644
index 0000000000..756214d4d9
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/Target.java
@@ -0,0 +1,37 @@
+package io.smallrye.reactive.messaging.aws.sqs;
+
+import java.util.Objects;
+
+public class Target {
+
+ private final String targetName;
+ private final String targetUrl;
+
+ public Target(String targetName, String targetUrl) {
+ this.targetName = targetName;
+ this.targetUrl = targetUrl;
+ }
+
+ public String getTargetName() {
+ return targetName;
+ }
+
+ public String getTargetUrl() {
+ return targetUrl;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ Target target = (Target) o;
+ return Objects.equals(targetUrl, target.targetUrl);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(targetUrl);
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/GetQueueUrlAction.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/GetQueueUrlAction.java
new file mode 100644
index 0000000000..10cc6b37fd
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/GetQueueUrlAction.java
@@ -0,0 +1,25 @@
+package io.smallrye.reactive.messaging.aws.sqs.action;
+
+import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorCommonConfiguration;
+import io.smallrye.reactive.messaging.aws.sqs.client.SqsClientHolder;
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsMessage;
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsMessageMetadata;
+import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
+import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
+
+public class GetQueueUrlAction {
+
+ public static Uni resolveQueueUrl(
+ final SqsClientHolder> clientHolder, final SqsMessage, M> message) {
+ final SqsConnectorCommonConfiguration config = clientHolder.getConfig();
+ final SqsMessageMetadata sqsMetadata = message.getSqsMetadata();
+
+ return Uni.createFrom().completionStage(
+ clientHolder.getClient().getQueueUrl(GetQueueUrlRequest.builder()
+ .queueName(config.getQueue().orElse(config.getChannel()))
+ .queueOwnerAWSAccountId(sqsMetadata.getQueueOwnerAWSAccountId())
+ .build()))
+ .onItem().transform(GetQueueUrlResponse::queueUrl);
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/SendBatchMessageAction.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/SendBatchMessageAction.java
new file mode 100644
index 0000000000..c2fbd16e08
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/SendBatchMessageAction.java
@@ -0,0 +1,92 @@
+package io.smallrye.reactive.messaging.aws.sqs.action;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
+import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorOutgoingConfiguration;
+import io.smallrye.reactive.messaging.aws.sqs.Target;
+import io.smallrye.reactive.messaging.aws.sqs.client.SqsClientHolder;
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsOutgoingMessage;
+import io.smallrye.reactive.messaging.aws.sqs.util.Helper;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static io.smallrye.reactive.messaging.aws.sqs.i18n.SqsExceptions.ex;
+
+public class SendBatchMessageAction {
+
+ public static Uni sendMessage(
+ final SqsClientHolder clientHolder,
+ Target target, final List extends SqsOutgoingMessage>> messages) {
+ final Map entryMap = new HashMap<>();
+ final Map> messageMap = new HashMap<>();
+
+ messages.forEach(msg -> {
+ final String payload = Helper.serialize(msg, clientHolder.getJsonMapping());
+ final String id = UUID.randomUUID().toString();
+
+ final SendMessageBatchRequestEntry entry = SendMessageBatchRequestEntry.builder()
+ // in batching we need to generate the id of a message for every entry.
+ .id(id)
+ .messageAttributes(null)
+ .messageGroupId(null)
+ .messageBody(payload)
+
+ .messageDeduplicationId(null)
+ .delaySeconds(0)
+ .messageSystemAttributesWithStrings(null)
+ .build();
+ messageMap.put(id, msg);
+ entryMap.put(id, entry);
+ });
+
+ final SendMessageBatchRequest request = SendMessageBatchRequest.builder()
+ .queueUrl(target.getTargetUrl())
+ .entries(entryMap.values())
+ .build();
+
+ // TODO: logging
+ Uni uni = Uni.createFrom().completionStage(clientHolder.getClient().sendMessageBatch(request))
+ .onItem().transformToUni(response -> handleResponse(response, messageMap));
+
+ // TODO: configurable retry. Retry complete batch in case the complete HTTP request fails
+ // but what to do in case just specific messages fails? In theory most efficient would be to retry
+ // them by adding them back to the stream. This seems very difficult. Another option is to do the configured
+ // retries for the failed once immediately until all are successful or max retries reached.
+ if (true) {
+ uni = uni.onFailure().retry()
+ .withBackOff(Duration.ofMillis(0), Duration.ofMillis(0))
+ .atMost(3);
+ }
+
+ // TODO: micrometer? Failure and Success?
+
+ return uni;
+ }
+
+ private static Uni handleResponse(
+ SendMessageBatchResponse response, Map> messageMap) {
+ final Uni successful = Multi.createFrom().iterable(response.successful())
+ .onItem().call(result -> {
+ OutgoingMessageMetadata.setResultOnMessage(messageMap.get(result.id()), result);
+ return Uni.createFrom().completionStage(messageMap.get(result.id()).ack());
+ }).collect().last().replaceWithVoid();
+
+ final Uni failed = Multi.createFrom().iterable(response.failed())
+ .onItem().call(result -> {
+ OutgoingMessageMetadata.setResultOnMessage(messageMap.get(result.id()), result);
+ return Uni.createFrom().completionStage(messageMap.get(result.id())
+ .nack(ex.illegalStateUnableToBuildClient(
+ new IllegalStateException(result.code() + ": " + result.message()))));
+ }).collect().last().replaceWithVoid();
+
+ return Uni.combine().all().unis(successful, failed).discardItems();
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/SendMessageAction.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/SendMessageAction.java
new file mode 100644
index 0000000000..af5fe264a5
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/SendMessageAction.java
@@ -0,0 +1,69 @@
+package io.smallrye.reactive.messaging.aws.sqs.action;
+
+import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
+import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorOutgoingConfiguration;
+import io.smallrye.reactive.messaging.aws.sqs.client.SqsClientHolder;
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsOutgoingMessage;
+import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsTrace;
+import io.smallrye.reactive.messaging.aws.sqs.util.Helper;
+import io.smallrye.reactive.messaging.tracing.TracingUtils;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+import java.time.Duration;
+
+import static io.smallrye.reactive.messaging.aws.sqs.tracing.SqsInstrumenter.SQS_OUTGOING_INSTRUMENTER;
+
+public class SendMessageAction {
+
+ public static Uni sendMessage(
+ final SqsClientHolder clientHolder, final SqsOutgoingMessage> message) {
+ String payload = Helper.serialize(message, clientHolder.getJsonMapping());
+
+ if (clientHolder.getConfig().getTracingEnabled()) {
+ SqsTrace trace = new SqsTrace()
+ .withQueue(message.getTarget().getTargetName())
+ // TODO: Cannot set messageId. It is provided in response.
+ // The intstrumenter documentation says:
+ // Call shouldStart(Context, Object) and do not proceed if it returns false.
+ // Call start(Context, Object) at the beginning of a request.
+ // Call end(Context, Object, Object, Throwable) at the end of a request.
+ // TracingUtils violates this, because end is called immediately and not at the end of a request.
+ // .withMessageId("")
+ .withConversationId(message.getSqsMetadata().getConversationId())
+ // We do not set payload size. This would require to calculate it, which is less performant.
+ ;
+ TracingUtils.traceOutgoing(SQS_OUTGOING_INSTRUMENTER, message, trace);
+ }
+
+ final SendMessageRequest request = SendMessageRequest.builder()
+ .queueUrl(message.getTarget().getTargetUrl())
+ .messageAttributes(null)
+ .messageGroupId(null)
+ .messageBody(payload)
+
+ .messageDeduplicationId(null)
+ .delaySeconds(0)
+ .messageSystemAttributesWithStrings(null)
+ .build();
+
+ // TODO: logging
+ Uni uni = Uni.createFrom().completionStage(clientHolder.getClient().sendMessage(request))
+ .onItem().transformToUni(response -> {
+ OutgoingMessageMetadata.setResultOnMessage(message, response);
+ // TODO: log
+ return Uni.createFrom().completionStage(message.ack());
+ });
+
+ // TODO: configurable retry
+ if (true) {
+ uni = uni.onFailure().retry()
+ .withBackOff(Duration.ofMillis(0), Duration.ofMillis(0))
+ .atMost(3);
+ }
+
+ // TODO: micrometer? Failure and Success?
+
+ return uni;
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/cache/TargetCache.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/cache/TargetCache.java
new file mode 100644
index 0000000000..50ad3ae685
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/cache/TargetCache.java
@@ -0,0 +1,28 @@
+package io.smallrye.reactive.messaging.aws.sqs.cache;
+
+import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorCommonConfiguration;
+import io.smallrye.reactive.messaging.aws.sqs.Target;
+import io.smallrye.reactive.messaging.aws.sqs.action.GetQueueUrlAction;
+import io.smallrye.reactive.messaging.aws.sqs.client.SqsClientHolder;
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsMessage;
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsMessageMetadata;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TargetCache {
+
+ private final Map> CACHE = new ConcurrentHashMap<>();
+
+ public Uni getTarget(
+ final SqsClientHolder> clientHolder, final SqsMessage, M> message) {
+ final SqsConnectorCommonConfiguration config = clientHolder.getConfig();
+
+ return CACHE.computeIfAbsent(
+ clientHolder.getConfig().getQueue().orElse(config.getChannel()),
+ key -> GetQueueUrlAction.resolveQueueUrl(clientHolder, message)
+ .onItem().transform(url -> new Target(key, url))
+ .memoize().indefinitely());
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/client/SqsClientFactory.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/client/SqsClientFactory.java
new file mode 100644
index 0000000000..46c79e3260
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/client/SqsClientFactory.java
@@ -0,0 +1,16 @@
+package io.smallrye.reactive.messaging.aws.sqs.client;
+
+import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorIncomingConfiguration;
+import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorOutgoingConfiguration;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+
+public class SqsClientFactory {
+
+ public static SqsAsyncClient createSqsClient(final SqsConnectorIncomingConfiguration config) {
+ return null;
+ }
+
+ public static SqsAsyncClient createSqsClient(final SqsConnectorOutgoingConfiguration config) {
+ return null;
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/client/SqsClientHolder.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/client/SqsClientHolder.java
new file mode 100644
index 0000000000..ddf8fd8b80
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/client/SqsClientHolder.java
@@ -0,0 +1,36 @@
+package io.smallrye.reactive.messaging.aws.sqs.client;
+
+import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorCommonConfiguration;
+import io.smallrye.reactive.messaging.aws.sqs.cache.TargetCache;
+import io.smallrye.reactive.messaging.json.JsonMapping;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+
+public class SqsClientHolder {
+ private final SqsAsyncClient client;
+ private final C config;
+ private final JsonMapping jsonMapping;
+ private final TargetCache targetCache;
+
+ public SqsClientHolder(SqsAsyncClient client, C config, JsonMapping jsonMapping, TargetCache targetCache) {
+ this.client = client;
+ this.config = config;
+ this.jsonMapping = jsonMapping;
+ this.targetCache = targetCache;
+ }
+
+ public SqsAsyncClient getClient() {
+ return client;
+ }
+
+ public C getConfig() {
+ return config;
+ }
+
+ public JsonMapping getJsonMapping() {
+ return jsonMapping;
+ }
+
+ public TargetCache getTargetCache() {
+ return targetCache;
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/config/ConfigResolver.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/config/ConfigResolver.java
new file mode 100644
index 0000000000..a14ee078e6
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/config/ConfigResolver.java
@@ -0,0 +1,4 @@
+package io.smallrye.reactive.messaging.aws.sqs.config;
+
+public class ConfigResolver {
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/i18n/SqsExceptions.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/i18n/SqsExceptions.java
new file mode 100644
index 0000000000..a239879296
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/i18n/SqsExceptions.java
@@ -0,0 +1,27 @@
+package io.smallrye.reactive.messaging.aws.sqs.i18n;
+
+import org.jboss.logging.Messages;
+import org.jboss.logging.annotations.Cause;
+import org.jboss.logging.annotations.Message;
+import org.jboss.logging.annotations.MessageBundle;
+
+@MessageBundle(projectCode = "SRMSG", length = 5)
+public interface SqsExceptions {
+
+ SqsExceptions ex = Messages.getBundle(SqsExceptions.class);
+
+ @Message(id = 19100, value = "Unable to build Pulsar client")
+ IllegalStateException illegalStateUnableToBuildClient(@Cause Throwable t);
+
+ @Message(id = 19101, value = "Unable to build Pulsar consumer")
+ IllegalStateException illegalStateUnableToBuildConsumer(@Cause Throwable t);
+
+ @Message(id = 19102, value = "Unable to build Pulsar producer")
+ IllegalStateException illegalStateUnableToBuildProducer(@Cause Throwable t);
+
+ @Message(id = 19103, value = "Expecting downstream to consume without back-pressure")
+ IllegalStateException illegalStateConsumeWithoutBackPressure();
+
+ @Message(id = 19104, value = "Only one subscriber allowed")
+ IllegalStateException illegalStateOnlyOneSubscriber();
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/i18n/SqsLogging.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/i18n/SqsLogging.java
new file mode 100644
index 0000000000..dec83ca17b
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/i18n/SqsLogging.java
@@ -0,0 +1,18 @@
+package io.smallrye.reactive.messaging.aws.sqs.i18n;
+
+import org.jboss.logging.BasicLogger;
+import org.jboss.logging.Logger;
+import org.jboss.logging.annotations.Cause;
+import org.jboss.logging.annotations.LogMessage;
+import org.jboss.logging.annotations.Message;
+import org.jboss.logging.annotations.MessageLogger;
+
+@MessageLogger(projectCode = "SRMSG", length = 5)
+public interface SqsLogging extends BasicLogger {
+
+ SqsLogging log = Logger.getMessageLogger(SqsLogging.class, "io.smallrye.reactive.messaging.aws.sqs");
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 19002, value = "Unable to close Sqs client")
+ void unableToCloseClient(@Cause Throwable t);
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsMessage.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsMessage.java
new file mode 100644
index 0000000000..9cfd5e87a8
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsMessage.java
@@ -0,0 +1,27 @@
+package io.smallrye.reactive.messaging.aws.sqs.message;
+
+import io.smallrye.reactive.messaging.aws.sqs.Target;
+import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
+
+public abstract class SqsMessage implements ContextAwareMessage {
+
+ private Target target;
+ private final M metadata;
+
+ public SqsMessage(M metadata) {
+ this.metadata = metadata;
+ }
+
+ public Target getTarget() {
+ return target;
+ }
+
+ public SqsMessage withTarget(Target target) {
+ this.target = target;
+ return this;
+ }
+
+ public M getSqsMetadata() {
+ return metadata;
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsMessageMetadata.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsMessageMetadata.java
new file mode 100644
index 0000000000..f8d5a71d9d
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsMessageMetadata.java
@@ -0,0 +1,47 @@
+package io.smallrye.reactive.messaging.aws.sqs.message;
+
+public abstract class SqsMessageMetadata {
+
+ private String queue;
+ private String queueOwnerAWSAccountId;
+ private String conversationId;
+
+
+ /**
+ * Get the name of the queue
+ *
+ * @return the queue name
+ */
+ public String getQueue() {
+ return queue;
+ }
+
+ public SqsMessageMetadata withQueue(String queue) {
+ this.queue = queue;
+ return this;
+ }
+
+ /**
+ * During queue name resolving it is possible to overwrite the AWS account id. If not specified the
+ * AWS accounts id from the provided client credentials are used
+ *
+ * @return overwritten AWS account id
+ */
+ public String getQueueOwnerAWSAccountId() {
+ return queueOwnerAWSAccountId;
+ }
+
+ public SqsMessageMetadata withQueueOwnerAWSAccountId(String queueOwnerAWSAccountId) {
+ this.queueOwnerAWSAccountId = queueOwnerAWSAccountId;
+ return this;
+ }
+
+ public String getConversationId() {
+ return conversationId;
+ }
+
+ public SqsMessageMetadata withConversationId(String conversationId) {
+ this.conversationId = conversationId;
+ return this;
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsOutgoingMessage.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsOutgoingMessage.java
new file mode 100644
index 0000000000..fd28a8faf1
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsOutgoingMessage.java
@@ -0,0 +1,43 @@
+package io.smallrye.reactive.messaging.aws.sqs.message;
+
+import org.eclipse.microprofile.reactive.messaging.Message;
+
+import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class SqsOutgoingMessage extends SqsMessage {
+
+ private final T payload;
+ private final Supplier> ack;
+ private final Function> nack;
+
+ public SqsOutgoingMessage(final T payload, final SqsOutgoingMessageMetadata metadata, Supplier> ack,
+ Function> nack) {
+ super(metadata);
+ this.payload = payload;
+ this.ack = ack;
+ this.nack = nack;
+ }
+
+ @Override
+ public T getPayload() {
+ return payload;
+ }
+
+ @Override
+ public Supplier> getAck() {
+ return this.ack;
+ }
+
+ @Override
+ public Function> getNack() {
+ return this.nack;
+ }
+
+ public static SqsOutgoingMessage from(Message message) {
+ return new SqsOutgoingMessage<>(message.getPayload(), message.getMetadata(SqsOutgoingMessageMetadata.class)
+ .orElseGet(SqsOutgoingMessageMetadata::new), message.getAck(), message.getNack());
+ }
+
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsOutgoingMessageMetadata.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsOutgoingMessageMetadata.java
new file mode 100644
index 0000000000..ba24269f63
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsOutgoingMessageMetadata.java
@@ -0,0 +1,5 @@
+package io.smallrye.reactive.messaging.aws.sqs.message;
+
+public class SqsOutgoingMessageMetadata extends SqsMessageMetadata {
+
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsAttributesExtractor.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsAttributesExtractor.java
new file mode 100644
index 0000000000..cbb42640ed
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsAttributesExtractor.java
@@ -0,0 +1,24 @@
+package io.smallrye.reactive.messaging.aws.sqs.tracing;
+
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
+
+public enum SqsAttributesExtractor implements AttributesExtractor {
+ INSTANCE;
+
+ @Override
+ public void onStart(final AttributesBuilder attributes, final Context parentContext, final SqsTrace request) {
+
+ }
+
+ @Override
+ public void onEnd(
+ final AttributesBuilder attributes,
+ final Context context,
+ final SqsTrace request,
+ final Void response,
+ final Throwable error) {
+
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsInstrumenter.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsInstrumenter.java
new file mode 100644
index 0000000000..9642f20399
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsInstrumenter.java
@@ -0,0 +1,28 @@
+package io.smallrye.reactive.messaging.aws.sqs.tracing;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
+import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
+import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
+import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
+import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
+
+import static io.smallrye.reactive.messaging.tracing.TracingUtils.INSTRUMENTATION_NAME;
+
+public class SqsInstrumenter {
+
+ public static final Instrumenter SQS_OUTGOING_INSTRUMENTER;
+
+ static {
+ final InstrumenterBuilder instrumenterBuilder = Instrumenter.builder(GlobalOpenTelemetry.get(),
+ INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(
+ SqsMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH));
+
+ SQS_OUTGOING_INSTRUMENTER = instrumenterBuilder
+ .addAttributesExtractor(MessagingAttributesExtractor.create(
+ SqsMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH))
+ .addAttributesExtractor(SqsAttributesExtractor.INSTANCE)
+ .buildProducerInstrumenter(SqsTraceTextMapSetter.INSTANCE);
+ }
+
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsMessagingAttributesGetter.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsMessagingAttributesGetter.java
new file mode 100644
index 0000000000..59e01c64bf
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsMessagingAttributesGetter.java
@@ -0,0 +1,54 @@
+package io.smallrye.reactive.messaging.aws.sqs.tracing;
+
+import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
+import io.opentelemetry.instrumentation.api.instrumenter.network.NetworkAttributesGetter;
+
+public enum SqsMessagingAttributesGetter implements MessagingAttributesGetter,
+ NetworkAttributesGetter {
+ INSTANCE;
+
+ @Override
+ public String getSystem(SqsTrace request) {
+ return "AmazonSQS";
+ }
+
+ @Override
+ public String getDestination(SqsTrace request) {
+ return request.getQueue();
+ }
+
+ @Override
+ public String getNetworkProtocolName(SqsTrace request, Void response) {
+ return "sqs";
+ }
+
+ @Override
+ public String getNetworkProtocolVersion(SqsTrace request, Void response) {
+ return "2012-11-05";
+ }
+
+ @Override
+ public boolean isTemporaryDestination(SqsTrace request) {
+ return false;
+ }
+
+ @Override
+ public String getConversationId(SqsTrace request) {
+ return request.getConversationId();
+ }
+
+ @Override
+ public Long getMessagePayloadSize(SqsTrace request) {
+ return request.getMessagePayloadSize();
+ }
+
+ @Override
+ public Long getMessagePayloadCompressedSize(SqsTrace request) {
+ return null;
+ }
+
+ @Override
+ public String getMessageId(SqsTrace request, Void response) {
+ return request.getMessageId();
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTrace.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTrace.java
new file mode 100644
index 0000000000..5d04ec380f
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTrace.java
@@ -0,0 +1,44 @@
+package io.smallrye.reactive.messaging.aws.sqs.tracing;
+
+public class SqsTrace {
+ private String queue;
+ private String conversationId;
+ private String messageId;
+ private Long messagePayloadSize;
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public SqsTrace withQueue(String queue) {
+ this.queue = queue;
+ return this;
+ }
+
+ public String getConversationId() {
+ return conversationId;
+ }
+
+ public SqsTrace withConversationId(String conversationId) {
+ this.conversationId = conversationId;
+ return this;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+ public SqsTrace withMessageId(String messageId) {
+ this.messageId = messageId;
+ return this;
+ }
+
+ public Long getMessagePayloadSize() {
+ return messagePayloadSize;
+ }
+
+ public SqsTrace withMessagePayloadSize(Long messagePayloadSize) {
+ this.messagePayloadSize = messagePayloadSize;
+ return this;
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTraceTextMapSetter.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTraceTextMapSetter.java
new file mode 100644
index 0000000000..7ba28f5f91
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/tracing/SqsTraceTextMapSetter.java
@@ -0,0 +1,14 @@
+package io.smallrye.reactive.messaging.aws.sqs.tracing;
+
+import io.opentelemetry.context.propagation.TextMapSetter;
+
+public enum SqsTraceTextMapSetter implements TextMapSetter {
+ INSTANCE;
+
+ @Override
+ public void set(SqsTrace carrier, String key, String value) {
+ if (carrier != null) {
+
+ }
+ }
+}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/util/Helper.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/util/Helper.java
new file mode 100644
index 0000000000..cc629d3d66
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/util/Helper.java
@@ -0,0 +1,16 @@
+package io.smallrye.reactive.messaging.aws.sqs.util;
+
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsMessage;
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsMessageMetadata;
+import io.smallrye.reactive.messaging.json.JsonMapping;
+
+public class Helper {
+
+ public static String serialize(SqsMessage, M> message, JsonMapping jsonMapping) {
+ if (message.getPayload() instanceof String) {
+ return (String) message.getPayload();
+ } else {
+ return jsonMapping.toJson(message.getPayload());
+ }
+ }
+}
diff --git a/smallrye-reactive-messaging-otel/src/main/java/io/smallrye/reactive/messaging/tracing/TracingUtils.java b/smallrye-reactive-messaging-otel/src/main/java/io/smallrye/reactive/messaging/tracing/TracingUtils.java
index 74bc791822..ee1604a106 100644
--- a/smallrye-reactive-messaging-otel/src/main/java/io/smallrye/reactive/messaging/tracing/TracingUtils.java
+++ b/smallrye-reactive-messaging-otel/src/main/java/io/smallrye/reactive/messaging/tracing/TracingUtils.java
@@ -12,6 +12,8 @@
public class TracingUtils {
+ public static final String INSTRUMENTATION_NAME = "io.smallrye.reactive.messaging";
+
private TracingUtils() {
}
From 03f837cd9744e0bfb14be6701c67da0ae89c50b9 Mon Sep 17 00:00:00 2001
From: holomekc <30546982+holomekc@users.noreply.github.com>
Date: Sun, 1 Oct 2023 09:54:12 +0200
Subject: [PATCH 02/14] Prepare create queue if not exists
---
.../messaging/aws/sqs/SqsConnector.java | 51 +++++-----
.../messaging/aws/sqs/SqsOutgoingChannel.java | 19 ++--
.../TargetCache.java => TargetResolver.java} | 16 ++--
.../aws/sqs/action/CreateQueueAction.java | 95 +++++++++++++++++++
.../aws/sqs/action/GetQueueUrlAction.java | 14 +--
.../sqs/action/SendBatchMessageAction.java | 79 ++++++++-------
.../aws/sqs/action/SendMessageAction.java | 26 +----
.../aws/sqs/client/SqsClientHolder.java | 12 +--
.../sqs/message/SqsCreateQueueMetadata.java | 30 ++++++
.../aws/sqs/message/SqsMessageMetadata.java | 24 +++++
.../aws/sqs/message/SqsOutgoingMessage.java | 6 +-
.../aws/sqs/tracing/SqsInstrumenter.java | 4 +-
12 files changed, 261 insertions(+), 115 deletions(-)
rename smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/{cache/TargetCache.java => TargetResolver.java} (72%)
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/CreateQueueAction.java
create mode 100644 smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/message/SqsCreateQueueMetadata.java
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java
index a2bad4c407..09fac81a3b 100644
--- a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java
@@ -1,12 +1,17 @@
package io.smallrye.reactive.messaging.aws.sqs;
-import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
-import io.smallrye.reactive.messaging.aws.sqs.cache.TargetCache;
-import io.smallrye.reactive.messaging.aws.sqs.client.SqsClientHolder;
-import io.smallrye.reactive.messaging.connector.InboundConnector;
-import io.smallrye.reactive.messaging.connector.OutboundConnector;
-import io.smallrye.reactive.messaging.health.HealthReporter;
-import io.smallrye.reactive.messaging.json.JsonMapping;
+import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING;
+import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING;
+import static io.smallrye.reactive.messaging.aws.sqs.client.SqsClientFactory.createSqsClient;
+import static io.smallrye.reactive.messaging.aws.sqs.i18n.SqsExceptions.ex;
+import static io.smallrye.reactive.messaging.aws.sqs.i18n.SqsLogging.log;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Flow;
+
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
@@ -15,30 +20,31 @@
import jakarta.enterprise.event.Reception;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
+
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
+
+import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
+import io.smallrye.reactive.messaging.aws.sqs.client.SqsClientHolder;
+import io.smallrye.reactive.messaging.connector.InboundConnector;
+import io.smallrye.reactive.messaging.connector.OutboundConnector;
+import io.smallrye.reactive.messaging.health.HealthReporter;
+import io.smallrye.reactive.messaging.json.JsonMapping;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.SqsException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Flow;
-
-import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING;
-import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING;
-import static io.smallrye.reactive.messaging.aws.sqs.client.SqsClientFactory.createSqsClient;
-import static io.smallrye.reactive.messaging.aws.sqs.i18n.SqsExceptions.ex;
-import static io.smallrye.reactive.messaging.aws.sqs.i18n.SqsLogging.log;
-
@ApplicationScoped
@Connector(SqsConnector.CONNECTOR_NAME)
// common
@ConnectorAttribute(name = "queue", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the SQS queue. If not set, the channel name is used")
@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")
@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true")
+@ConnectorAttribute(name = "create-queue.enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether automatic queue creation is enabled or disabled (default)", defaultValue = "false")
+@ConnectorAttribute(name = "create-queue.dlq.enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether automatic dead-letter queue creation is enabled or disabled (default)", defaultValue = "false")
+@ConnectorAttribute(name = "create-queue.dlq.prefix", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Dead-letter queue name prefix", defaultValue = "")
+@ConnectorAttribute(name = "create-queue.dlq.suffix", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Dead-letter queue name suffix", defaultValue = "-dlq")
+@ConnectorAttribute(name = "create-queue.dlq.max-receive-count", type = "int", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The number of times a message is delivered to the source queue before being moved to the dead-letter queue. Default: 10. When the ReceiveCount for a message exceeds the maxReceiveCount for a queue, Amazon SQS moves the message to the dead-letter-queue.", defaultValue = "10")
// outgoing
@ConnectorAttribute(name = "send.batch.enabled", type = "boolean", direction = OUTGOING, description = "Send messages in batches.", defaultValue = "false")
@@ -90,7 +96,8 @@ public Flow.Subscriber extends Message>> getSubscriber(Config config) {
clientsByChannel.put(oc.getChannel(), client);
try {
- SqsOutgoingChannel channel = new SqsOutgoingChannel(new SqsClientHolder<>(client, oc, jsonMapping, new TargetCache()));
+ SqsOutgoingChannel channel = new SqsOutgoingChannel(
+ new SqsClientHolder<>(client, oc, jsonMapping, new TargetResolver()));
outgoingChannels.add(channel);
return channel.getSubscriber();
} catch (SqsException e) {
@@ -99,9 +106,7 @@ public Flow.Subscriber extends Message>> getSubscriber(Config config) {
}
public void terminate(
- @Observes(notifyObserver = Reception.IF_EXISTS)
- @Priority(50)
- @BeforeDestroyed(ApplicationScoped.class) Object event) {
+ @Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object event) {
// incomingChannels.forEach(PulsarIncomingChannel::close);
outgoingChannels.forEach(SqsOutgoingChannel::close);
for (SqsAsyncClient client : clients.values()) {
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutgoingChannel.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutgoingChannel.java
index cca9ae3487..7e85c2566e 100644
--- a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutgoingChannel.java
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutgoingChannel.java
@@ -1,5 +1,13 @@
package io.smallrye.reactive.messaging.aws.sqs;
+import static io.smallrye.reactive.messaging.aws.sqs.tracing.SqsInstrumenter.SQS_OUTGOING_INSTRUMENTER;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.Flow;
+
+import org.eclipse.microprofile.reactive.messaging.Message;
+
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
@@ -11,13 +19,6 @@
import io.smallrye.reactive.messaging.aws.sqs.tracing.SqsTrace;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
-import org.eclipse.microprofile.reactive.messaging.Message;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.concurrent.Flow;
-
-import static io.smallrye.reactive.messaging.aws.sqs.tracing.SqsInstrumenter.SQS_OUTGOING_INSTRUMENTER;
public class SqsOutgoingChannel {
@@ -95,8 +96,8 @@ private void tracing(SqsOutgoingMessage> message) {
// possible as mentioned. Or it would be necessary to create the ids here. I do not like that.
// .withMessageId("")
.withConversationId(message.getSqsMetadata().getConversationId())
- // We do not set payload size. This would require to calculate it, which is less performant.
- ;
+ // We do not set payload size. This would require to calculate it, which is less performant.
+ ;
TracingUtils.traceOutgoing(SQS_OUTGOING_INSTRUMENTER, message, trace);
}
}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/cache/TargetCache.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/TargetResolver.java
similarity index 72%
rename from smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/cache/TargetCache.java
rename to smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/TargetResolver.java
index 50ad3ae685..2d7dff6226 100644
--- a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/cache/TargetCache.java
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/TargetResolver.java
@@ -1,17 +1,17 @@
-package io.smallrye.reactive.messaging.aws.sqs.cache;
+package io.smallrye.reactive.messaging.aws.sqs;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import io.smallrye.mutiny.Uni;
-import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorCommonConfiguration;
-import io.smallrye.reactive.messaging.aws.sqs.Target;
+import io.smallrye.reactive.messaging.aws.sqs.action.CreateQueueAction;
import io.smallrye.reactive.messaging.aws.sqs.action.GetQueueUrlAction;
import io.smallrye.reactive.messaging.aws.sqs.client.SqsClientHolder;
import io.smallrye.reactive.messaging.aws.sqs.message.SqsMessage;
import io.smallrye.reactive.messaging.aws.sqs.message.SqsMessageMetadata;
+import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class TargetCache {
+public class TargetResolver {
private final Map> CACHE = new ConcurrentHashMap<>();
@@ -22,6 +22,8 @@ public Uni getTarget(
return CACHE.computeIfAbsent(
clientHolder.getConfig().getQueue().orElse(config.getChannel()),
key -> GetQueueUrlAction.resolveQueueUrl(clientHolder, message)
+ .onFailure(QueueDoesNotExistException.class)
+ .call(() -> CreateQueueAction.createQueue(clientHolder, message))
.onItem().transform(url -> new Target(key, url))
.memoize().indefinitely());
}
diff --git a/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/CreateQueueAction.java b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/CreateQueueAction.java
new file mode 100644
index 0000000000..6f7d930000
--- /dev/null
+++ b/smallrye-reactive-messaging-aws/smallrye-reactive-messaging-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/action/CreateQueueAction.java
@@ -0,0 +1,95 @@
+package io.smallrye.reactive.messaging.aws.sqs.action;
+
+import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorCommonConfiguration;
+import io.smallrye.reactive.messaging.aws.sqs.client.SqsClientHolder;
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsCreateQueueMetadata;
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsMessage;
+import io.smallrye.reactive.messaging.aws.sqs.message.SqsMessageMetadata;
+import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
+import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
+import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
+import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
+
+import java.util.Map;
+
+/**
+ * AWS
+ * Documentation
+ */
+public class CreateQueueAction {
+
+ public static Uni createQueue(
+ SqsClientHolder> clientHolder, SqsMessage, M> message) {
+ SqsConnectorCommonConfiguration config = clientHolder.getConfig();
+
+ if (!config.getCreateQueueEnabled()) {
+ return Uni.createFrom().nullItem();
+ }
+
+ String queueName = message.getTarget().getTargetName();
+ Uni