diff --git a/.github/workflows/upload-docs.yml b/.github/workflows/upload-docs.yml index 0d38cba04..18d44a80b 100644 --- a/.github/workflows/upload-docs.yml +++ b/.github/workflows/upload-docs.yml @@ -29,12 +29,11 @@ jobs: with: candidate: java version: '17.0.1-tem' - - name: Set up JDK 8 - uses: actions/setup-java@v4 + - name: Set up JDK + uses: actions/setup-java@v1 with: java-version: 8 - distribution: 'adopt' - cache: 'maven' + jdkFile: ${{ steps.sdkman.outputs.file }} - uses: actions/cache@v1 with: path: ~/.m2/repository diff --git a/docs/pom.xml b/docs/pom.xml index 2cae357ba..6c0178768 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -6,7 +6,7 @@ io.awspring.cloud spring-cloud-aws - 3.2.0-SNAPSHOT + 3.2.0 spring-cloud-aws-docs pom diff --git a/docs/src/main/asciidoc/_configprops.adoc b/docs/src/main/asciidoc/_configprops.adoc index e44304c89..339570a12 100644 --- a/docs/src/main/asciidoc/_configprops.adoc +++ b/docs/src/main/asciidoc/_configprops.adoc @@ -5,7 +5,8 @@ |spring.cloud.aws.cloudwatch.region | | Overrides the default region. |spring.cloud.aws.credentials.access-key | | The access key to be used with a static provider. |spring.cloud.aws.credentials.instance-profile | `+++false+++` | Configures an instance profile credentials provider with no further configuration. -|spring.cloud.aws.credentials.profile | | The AWS profile. +|spring.cloud.aws.credentials.profile.name | | Profile name. +|spring.cloud.aws.credentials.profile.path | | Profile file path. |spring.cloud.aws.credentials.secret-key | | The secret key to be used with a static provider. |spring.cloud.aws.credentials.sts.async-credentials-update | `+++false+++` | Enables provider to asynchronously fetch credentials in the background. Defaults to synchronous blocking if not specified otherwise. |spring.cloud.aws.credentials.sts.role-arn | | ARN of IAM role associated with STS. If not provided this will be read from {@link software.amazon.awssdk.core.SdkSystemSetting}. @@ -39,7 +40,8 @@ |spring.cloud.aws.parameterstore.reload.period | `+++1m+++` | Refresh period for {@link PollingAwsPropertySourceChangeDetector}. |spring.cloud.aws.parameterstore.reload.strategy | | Reload strategy to run when properties change. |spring.cloud.aws.region.instance-profile | `+++false+++` | Configures an instance profile region provider with no further configuration. -|spring.cloud.aws.region.profile | | The AWS profile. +|spring.cloud.aws.region.profile.name | | Profile name. +|spring.cloud.aws.region.profile.path | | Profile file path. |spring.cloud.aws.region.static | | |spring.cloud.aws.s3.accelerate-mode-enabled | | Option to enable using the accelerate endpoint when accessing S3. Accelerate endpoints allow faster transfer of objects by using Amazon CloudFront's globally distributed edge locations. |spring.cloud.aws.s3.checksum-validation-enabled | | Option to disable doing a validation of the checksum of an object stored in S3. @@ -76,6 +78,7 @@ |spring.cloud.aws.sqs.listener.max-concurrent-messages | | The maximum concurrent messages that can be processed simultaneously for each queue. Note that if acknowledgement batching is being used, the actual maximum number of messages inflight might be higher. |spring.cloud.aws.sqs.listener.max-messages-per-poll | | The maximum number of messages to be retrieved in a single poll to SQS. |spring.cloud.aws.sqs.listener.poll-timeout | | The maximum amount of time for a poll to SQS. +|spring.cloud.aws.sqs.queue-not-found-strategy | | |spring.cloud.aws.sqs.region | | Overrides the default region. |=== \ No newline at end of file diff --git a/docs/src/main/asciidoc/docker-compose.adoc b/docs/src/main/asciidoc/docker-compose.adoc new file mode 100644 index 000000000..2ef7c5a67 --- /dev/null +++ b/docs/src/main/asciidoc/docker-compose.adoc @@ -0,0 +1,34 @@ +[#spring-cloud-aws-docker-compose] +== Docker Compose + +Spring Cloud AWS provides Docker Compose support for https://docs.localstack.cloud/references/docker-images/[LocalStack docker images] which simplifies local development of Spring Cloud AWS based projects. + +Maven coordinates, using <>: + +[source,xml] +---- + + io.awspring.cloud + spring-cloud-aws-docker-compose + +---- + +For more information about Spring Docker Compose support please refer to https://docs.spring.io/spring-boot/reference/features/docker-compose.html[official Spring documentation] + +=== Example docker-compose.yaml file + +[source,yaml] +---- +services: + localstack: + image: localstack/localstack + environment: + AWS_ACCESS_KEY_ID: noop + AWS_SECRET_ACCESS_KEY: noop + AWS_DEFAULT_REGION: eu-central-1 + ports: + - "4566:4566" +---- + +`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` and `AWS_DEFAULT_REGION` are required environment variables to ensure proper integration. + diff --git a/docs/src/main/asciidoc/index.adoc b/docs/src/main/asciidoc/index.adoc index 4899b5b3f..14382435d 100644 --- a/docs/src/main/asciidoc/index.adoc +++ b/docs/src/main/asciidoc/index.adoc @@ -154,6 +154,8 @@ include::spring-modulith.adoc[] include::testing.adoc[] +include::docker-compose.adoc[] + include::migration.adoc[] == Configuration properties diff --git a/docs/src/main/asciidoc/s3.adoc b/docs/src/main/asciidoc/s3.adoc index bdb781925..8a7df66a7 100644 --- a/docs/src/main/asciidoc/s3.adoc +++ b/docs/src/main/asciidoc/s3.adoc @@ -208,7 +208,7 @@ s3Template.upload(BUCKET, "file.txt", is, ObjectMetadata.builder().contentType(" Another feature of `S3Template` is the ability to generate signed URLs for getting/putting S3 objects in a single method call. [source,java] ---- -URL signedGetUrl = s3Template.createSignedGetUrl("bucket_name", "file.txt", Duration.ofMinutes(5)); +URL signedGetUrl = s3Template.createSignedGetURL("bucket_name", "file.txt", Duration.ofMinutes(5)); ---- `S3Template` also allows storing & retrieving Java objects. diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index d1d4ca766..bd60f4194 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -618,6 +618,9 @@ NOTE: Queues declared in the same annotation will share the container, though ea ===== SNS Messages Since 3.1.1, when receiving SNS messages through the `@SqsListener`, the message includes all attributes of the `SnsNotification`. To only receive need the `Message` part of the payload, you can utilize the `@SnsNotificationMessage` annotation. + +For handling individual message processing, the @SnsNotificationMessage annotation can be used in the following manner: + [source, java] ---- @SqsListener("my-queue") @@ -626,6 +629,16 @@ public void listen(@SnsNotificationMessage Pojo pojo) { } ---- +For batch message processing, use the @SnsNotificationMessage annotation with a List parameter. + +[source, java] +---- +@SqsListener("my-queue") +public void listen(@SnsNotificationMessage List pojos) { + System.out.println(pojos.size()); +} +---- + ===== Specifying a MessageListenerContainerFactory A `MessageListenerContainerFactory` can be specified through the `factory` property. Such factory will then be used to create the container for the annotated method. @@ -677,6 +690,7 @@ AcknowledgementMode must be set to `MANUAL` (see <>) - `BatchAcknowledgement` - provides methods for manually acknowledging partial or whole message batches for batch listeners. AcknowledgementMode must be set to `MANUAL` (see <>) - `Visibility` - provides the `changeTo()` method that enables changing the message's visibility to the provided value. +- `BatchVisibility` - provides `changeTo()` methods that enables changing partial or whole message batches visibility to the provided value. - `QueueAttributes` - provides queue attributes for the queue that received the message. See <> for how to specify the queue attributes that will be fetched from `SQS` - `software.amazon.awssdk.services.sqs.model.Message` - provides the original `Message` from `SQS` @@ -699,7 +713,7 @@ public void listen(Message message, MyPojo pojo, MessageHeaders headers, } ---- -IMPORTANT: Batch listeners support a single `List` and `List>` method arguments, and an optional `BatchAcknowledgement` or `AsyncBatchAcknowledgement` arguments. +IMPORTANT: Batch listeners support a single `List` and `List>` method arguments, and optional `BatchAcknowledgement` (or `AsyncBatchAcknowledgement`) and `BatchVisibility` arguments. `MessageHeaders` should be extracted from the `Message` instances through the `getHeaders()` method. ==== Batch Processing @@ -711,7 +725,9 @@ When batch mode is enabled, the framework will serve the entire result of a poll If a value greater than 10 is provided for `maxMessagesPerPoll`, the result of multiple polls will be combined and up to the respective amount of messages will be served to the listener. To enable batch processing using `@SqsListener`, a single `List` or `List>` method argument should be provided in the listener method. -The listener method can also have an optional `BatchAcknowledgement` argument for `AcknowledgementMode.MANUAL`. +The listener method can also have: +- an optional `BatchAcknowledgement` argument for `AcknowledgementMode.MANUAL` +- an optional `BatchVisibility` argument Alternatively, `SqsContainerOptions` can be set to `ListenerMode.BATCH` in the `SqsContainerOptions` in the factory or container. @@ -786,6 +802,7 @@ The Spring Boot Starter for SQS provides the following auto-configuration proper | <> | Maximum number of inflight messages per queue. | No | 10 | <> | Maximum number of messages to be received per poll. | No | 10 | <> | Maximum amount of time to wait for messages in a poll. | No | 10 seconds +| `spring.cloud.aws.sqs.queue-not-found-strategy` | The strategy to be used by SqsTemplate and SqsListeners when a queue does not exist. | No | CREATE |=== @@ -884,6 +901,12 @@ See <>. |Configures the `MessageSystemAttribute` that will be retrieved from SQS for each message. See <>. +|`fifoBatchGroupingStrategy` +|`PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES`, `PROCESS_MULTIPLE_GROUPS_IN_SAME_BATCH` +|`PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES` +|Specifies how messages from FIFO queues should be grouped when retrieved by the container when listener +mode is `batch`. See <>. + |`messageConverter` |`MessagingMessageConverter` |`SqsMessagingMessageConverter` @@ -1029,10 +1052,9 @@ NOTE: Spring-managed `MessageListenerContainer` beans' lifecycle actions are alw * Messages are polled with a `receiveRequestAttemptId`, and the received batch of messages is split according to the message`s `MessageGroupId`. * Each message from a given group will then be processed in order, while each group is processed in parallel. -* If processing fails for a message, the following messages from the same message group are discarded so they will be served again after their `message visibility` -expires. +* To receive messages from multiple groups in a `batch`, set `fifoBatchGroupingStrategy` to `PROCESS_MULTIPLE_GROUPS_IN_SAME_BATCH` in `SqsContainerOptions`. +* If processing fails for a message, the following messages from the same message group are discarded so they will be served again after their `message visibility` expires. * Messages which were already successfully processed and acknowledged will not be served again. -* If a `batch` listener is used, each message group from a poll will be served as a batch to the listener method. * `FIFO` queues also have different defaults for acknowledging messages, see <> for more information. * If a `message visibility` is set through `@SqsListener` or `SqsContainerOptions`, visibility will be extended for all messages in the message group before each message is processed. diff --git a/pom.xml b/pom.xml index 6a9d8a63f..7545f15e9 100644 --- a/pom.xml +++ b/pom.xml @@ -6,13 +6,13 @@ org.springframework.cloud spring-cloud-build - 4.1.0 + 4.1.3 io.awspring.cloud spring-cloud-aws - 3.2.0-SNAPSHOT + 3.2.0 pom Spring Cloud AWS @@ -37,6 +37,7 @@ spring-cloud-aws-core spring-cloud-aws-autoconfigure spring-cloud-aws-dependencies + spring-cloud-aws-docker-compose spring-cloud-aws-parameter-store spring-cloud-aws-secrets-manager spring-cloud-aws-ses diff --git a/spring-cloud-aws-autoconfigure/pom.xml b/spring-cloud-aws-autoconfigure/pom.xml index acdd0c317..662473f4d 100644 --- a/spring-cloud-aws-autoconfigure/pom.xml +++ b/spring-cloud-aws-autoconfigure/pom.xml @@ -7,7 +7,7 @@ io.awspring.cloud spring-cloud-aws - 3.2.0-SNAPSHOT + 3.2.0 spring-cloud-aws-autoconfigure diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index aa41805a9..8cefc01a8 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2022 the original author or authors. + * Copyright 2013-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,13 +24,14 @@ import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration; import io.awspring.cloud.sqs.config.SqsListenerConfigurer; import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; -import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor; +import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder; import io.awspring.cloud.sqs.operations.SqsTemplate; import io.awspring.cloud.sqs.operations.SqsTemplateBuilder; +import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter; import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -45,12 +46,15 @@ import org.springframework.context.annotation.Import; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder; +import software.amazon.awssdk.services.sqs.model.Message; /** * {@link EnableAutoConfiguration Auto-configuration} for SQS integration. * * @author Tomaz Fernandes * @author Maciej Walkowiak + * @author Wei Jiang + * @author Dongha Kim * @since 3.0 */ @AutoConfiguration @@ -78,10 +82,12 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder @ConditionalOnMissingBean @Bean - public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider objectMapperProvider) { - SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient); - objectMapperProvider - .ifAvailable(om -> builder.configureDefaultConverter(converter -> converter.setObjectMapper(om))); + public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider objectMapperProvider, MessagingMessageConverter messageConverter) { + SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient).messageConverter(messageConverter); + objectMapperProvider.ifAvailable(om -> setMapperToConverter(messageConverter, om)); + if (sqsProperties.getQueueNotFoundStrategy() != null) { + builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy())); + } return builder.build(); } @@ -92,28 +98,36 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac ObjectProvider> errorHandler, ObjectProvider> asyncInterceptors, ObjectProvider> interceptors, - ObjectProvider objectMapperProvider) { + ObjectProvider objectMapperProvider, + MessagingMessageConverter messagingMessageConverter) { SqsMessageListenerContainerFactory factory = new SqsMessageListenerContainerFactory<>(); - factory.configure(this::configureContainerOptions); + factory.configure(this::configureProperties); sqsAsyncClient.ifAvailable(factory::setSqsAsyncClient); asyncErrorHandler.ifAvailable(factory::setErrorHandler); errorHandler.ifAvailable(factory::setErrorHandler); interceptors.forEach(factory::addMessageInterceptor); asyncInterceptors.forEach(factory::addMessageInterceptor); - objectMapperProvider.ifAvailable(objectMapper -> setObjectMapper(factory, objectMapper)); + objectMapperProvider.ifAvailable(om -> setMapperToConverter(messagingMessageConverter, om)); + factory.configure(options -> options.messageConverter(messagingMessageConverter)); return factory; } - private void setObjectMapper(SqsMessageListenerContainerFactory factory, ObjectMapper objectMapper) { - // Object Mapper for early deserialization in MessageSource - var messageConverter = new SqsMessagingMessageConverter(); - messageConverter.setObjectMapper(objectMapper); - factory.configure(options -> options.messageConverter(messageConverter)); + private void setMapperToConverter(MessagingMessageConverter messagingMessageConverter, ObjectMapper om) { + if (messagingMessageConverter instanceof SqsMessagingMessageConverter sqsConverter) { + sqsConverter.setObjectMapper(om); + } } - private void configureContainerOptions(ContainerOptionsBuilder options) { + @ConditionalOnMissingBean + @Bean + public MessagingMessageConverter messageConverter() { + return new SqsMessagingMessageConverter(); + } + + private void configureProperties(SqsContainerOptionsBuilder options) { PropertyMapper mapper = PropertyMapper.get().alwaysApplyingWhenNonNull(); + mapper.from(this.sqsProperties.getQueueNotFoundStrategy()).to(options::queueNotFoundStrategy); mapper.from(this.sqsProperties.getListener().getMaxConcurrentMessages()).to(options::maxConcurrentMessages); mapper.from(this.sqsProperties.getListener().getMaxMessagesPerPoll()).to(options::maxMessagesPerPoll); mapper.from(this.sqsProperties.getListener().getPollTimeout()).to(options::pollTimeout); diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java index 7facf18cf..d372ccad1 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java @@ -16,6 +16,8 @@ package io.awspring.cloud.autoconfigure.sqs; import io.awspring.cloud.autoconfigure.AwsClientProperties; +import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; + import java.time.Duration; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.lang.Nullable; @@ -24,6 +26,7 @@ * Properties related to AWS SQS. * * @author Tomaz Fernandes + * @author Wei Jiang * @since 3.0 */ @ConfigurationProperties(prefix = SqsProperties.PREFIX) @@ -44,6 +47,26 @@ public void setListener(Listener listener) { this.listener = listener; } + @Nullable + private QueueNotFoundStrategy queueNotFoundStrategy; + + /** + * Return the strategy to use if the queue is not found. + * @return the {@link QueueNotFoundStrategy} + */ + @Nullable + public QueueNotFoundStrategy getQueueNotFoundStrategy() { + return queueNotFoundStrategy; + } + + /** + * Set the strategy to use if the queue is not found. + * @param queueNotFoundStrategy the strategy to set. + */ + public void setQueueNotFoundStrategy(QueueNotFoundStrategy queueNotFoundStrategy) { + this.queueNotFoundStrategy = queueNotFoundStrategy; + } + public static class Listener { /** diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationIntegrationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationIntegrationTest.java index 461d4d930..6d3f5ee2e 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationIntegrationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationIntegrationTest.java @@ -16,12 +16,18 @@ package io.awspring.cloud.autoconfigure.sqs; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration; import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import io.awspring.cloud.sqs.QueueAttributesResolvingException; import io.awspring.cloud.sqs.annotation.SqsListener; import io.awspring.cloud.sqs.operations.SqsTemplate; +import org.testcontainers.shaded.org.bouncycastle.util.Arrays; +import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; + +import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -29,6 +35,7 @@ import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.ApplicationContextException; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.testcontainers.containers.localstack.LocalStackContainer; @@ -40,6 +47,7 @@ * Integration tests for {@link SqsAutoConfiguration}. * * @author Tomaz Fernandes + * @author Wei Jiang */ @SpringBootTest @Testcontainers @@ -49,18 +57,30 @@ class SqsAutoConfigurationIntegrationTest { private static final String PAYLOAD = "Test"; - private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withPropertyValues("spring.cloud.aws.sqs.region=eu-west-1", - "spring.cloud.aws.sqs.endpoint=" + localstack.getEndpoint(), - "spring.cloud.aws.credentials.access-key=noop", "spring.cloud.aws.credentials.secret-key=noop", - "spring.cloud.aws.region.static=eu-west-1") - .withConfiguration(AutoConfigurations.of(RegionProviderAutoConfiguration.class, - CredentialsProviderAutoConfiguration.class, SqsAutoConfiguration.class, AwsAutoConfiguration.class, - ListenerConfiguration.class)); - @Container static LocalStackContainer localstack = new LocalStackContainer( - DockerImageName.parse("localstack/localstack:3.2.0")); + DockerImageName.parse("localstack/localstack:3.2.0")); + + static { + localstack.start(); + } + + private static final String[] BASE_PARAMS = {"spring.cloud.aws.sqs.region=eu-west-1", + "spring.cloud.aws.sqs.endpoint=" + localstack.getEndpoint(), + "spring.cloud.aws.credentials.access-key=noop", "spring.cloud.aws.credentials.secret-key=noop", + "spring.cloud.aws.region.static=eu-west-1"}; + + private static final AutoConfigurations BASE_CONFIGURATIONS = AutoConfigurations.of(RegionProviderAutoConfiguration.class, + CredentialsProviderAutoConfiguration.class, SqsAutoConfiguration.class, AwsAutoConfiguration.class, + ListenerConfiguration.class); + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withPropertyValues(BASE_PARAMS) + .withConfiguration(BASE_CONFIGURATIONS); + + private final ApplicationContextRunner applicationContextRunnerWithFailStrategy = new ApplicationContextRunner() + .withPropertyValues(Arrays.append(BASE_PARAMS, "spring.cloud.aws.sqs.queue-not-found-strategy=fail")) + .withConfiguration(BASE_CONFIGURATIONS); @SuppressWarnings("unchecked") @Test @@ -73,6 +93,25 @@ void sendsAndReceivesMessage() { }); } + @Test + void containerReceivesMessageWithFailQueueNotFoundStrategy() { + applicationContextRunnerWithFailStrategy.run(context -> + assertThatThrownBy(() -> context.getBean(SqsTemplate.class).send(to -> to.queue("QUEUE_DOES_NOT_EXISTS").payload(PAYLOAD))) + .isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class).cause() + .isInstanceOf(CompletionException.class).cause().isInstanceOf(QueueAttributesResolvingException.class) + .cause().isInstanceOf(QueueDoesNotExistException.class)); + } + + @Test + void templateReceivesMessageWithFailQueueNotFoundStrategy() { + applicationContextRunnerWithFailStrategy + .run(context -> + assertThatThrownBy(() -> context.getBean(SqsTemplate.class).receive("QUEUE_DOES_NOT_EXIST", String.class)) + .isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class).cause() + .isInstanceOf(CompletionException.class).cause().isInstanceOf(QueueAttributesResolvingException.class) + .cause().isInstanceOf(QueueDoesNotExistException.class)); + } + static class Listener { @Autowired diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java index 57a72f0ea..e8003804d 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java @@ -30,11 +30,16 @@ import io.awspring.cloud.sqs.config.EndpointRegistrar; import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration; import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; +import io.awspring.cloud.sqs.listener.AbstractContainerOptions; import io.awspring.cloud.sqs.listener.ContainerOptions; import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder; +import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; +import io.awspring.cloud.sqs.listener.MessageListenerContainerRegistry; +import io.awspring.cloud.sqs.listener.SqsContainerOptions; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.operations.SqsTemplate; +import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter; import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; import java.net.URI; import java.time.Duration; @@ -55,15 +60,18 @@ import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder; +import software.amazon.awssdk.services.sqs.model.Message; /** * Tests for {@link SqsAutoConfiguration}. * * @author Tomaz Fernandes + * @author Wei Jiang */ class SqsAutoConfigurationTest { private static final String CUSTOM_OBJECT_MAPPER_BEAN_NAME = "customObjectMapper"; + private static final String CUSTOM_MESSAGE_CONVERTER_BEAN_NAME = "customMessageConverter"; private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() .withPropertyValues("spring.cloud.aws.region.static:eu-west-1") @@ -110,6 +118,18 @@ void withCustomEndpoint() { }); } + @Test + void withCustomQueueNotFoundStrategy() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.queue-not-found-strategy=fail").run(context -> { + assertThat(context).hasSingleBean(SqsProperties.class); + SqsProperties sqsProperties = context.getBean(SqsProperties.class); + assertThat(context).hasSingleBean(SqsAsyncClient.class); + assertThat(context).hasSingleBean(SqsTemplate.class); + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + assertThat(sqsProperties.getQueueNotFoundStrategy()).isEqualTo(QueueNotFoundStrategy.FAIL); + }); + } + // @formatter:off @Test void customSqsClientConfigurer() { @@ -158,7 +178,7 @@ void configuresFactoryComponentsAndOptions() { .asInstanceOf(type(CompositeMessageConverter.class)) .extracting(CompositeMessageConverter::getConverters) .isInstanceOfSatisfying(List.class, converters -> - assertThat(converters.get(1)).isInstanceOfSatisfying( + assertThat(converters.get(2)).isInstanceOfSatisfying( MappingJackson2MessageConverter.class, jackson2MessageConverter -> assertThat(jackson2MessageConverter.getObjectMapper().getRegisteredModuleIds()).contains("jackson-datatype-jsr310"))); @@ -185,7 +205,7 @@ void configuresFactoryComponentsAndOptionsWithDefaults() { .asInstanceOf(type(CompositeMessageConverter.class)) .extracting(CompositeMessageConverter::getConverters) .isInstanceOfSatisfying(List.class, converters -> - assertThat(converters.get(1)).isInstanceOfSatisfying( + assertThat(converters.get(2)).isInstanceOfSatisfying( MappingJackson2MessageConverter.class, jackson2MessageConverter -> assertThat(jackson2MessageConverter.getObjectMapper().getRegisteredModuleIds()).isEmpty())); @@ -205,6 +225,30 @@ void configuresObjectMapper() { }); } + @Test + void configuresMessageConverter() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") + .withUserConfiguration(ObjectMapperConfiguration.class, MessageConverterConfiguration.class) + .run(context -> { + SqsTemplate sqsTemplate = context.getBean("sqsTemplate", SqsTemplate.class); + SqsMessageListenerContainerFactory factory = context.getBean("defaultSqsListenerContainerFactory", SqsMessageListenerContainerFactory.class); + ObjectMapper objectMapper = context.getBean(CUSTOM_OBJECT_MAPPER_BEAN_NAME, ObjectMapper.class); + SqsMessagingMessageConverter converter = context.getBean(CUSTOM_MESSAGE_CONVERTER_BEAN_NAME, SqsMessagingMessageConverter.class); + assertThat(converter.getPayloadMessageConverter()) + .extracting("converters") + .asList() + .filteredOn(conv -> conv instanceof MappingJackson2MessageConverter) + .first() + .extracting("objectMapper") + .isEqualTo(objectMapper); + assertThat(sqsTemplate).extracting("messageConverter").isEqualTo(converter); + assertThat(factory) + .extracting("containerOptionsBuilder") + .extracting("messageConverter") + .isEqualTo(converter); + }); + } + @Configuration(proxyBeanMethods = false) static class CustomComponentsConfiguration { @@ -232,6 +276,16 @@ ObjectMapper objectMapper() { } + @Configuration(proxyBeanMethods = false) + static class MessageConverterConfiguration { + + @Bean(name = CUSTOM_MESSAGE_CONVERTER_BEAN_NAME) + MessagingMessageConverter messageConverter() { + return new SqsMessagingMessageConverter(); + } + + } + @Configuration(proxyBeanMethods = false) static class CustomAwsAsyncClientConfig { diff --git a/spring-cloud-aws-core/pom.xml b/spring-cloud-aws-core/pom.xml index c49c39061..49fb07290 100644 --- a/spring-cloud-aws-core/pom.xml +++ b/spring-cloud-aws-core/pom.xml @@ -7,7 +7,7 @@ io.awspring.cloud spring-cloud-aws - 3.2.0-SNAPSHOT + 3.2.0 spring-cloud-aws-core diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml index e05a09104..23671ffc7 100644 --- a/spring-cloud-aws-dependencies/pom.xml +++ b/spring-cloud-aws-dependencies/pom.xml @@ -19,16 +19,16 @@ io.awspring.cloud spring-cloud-aws-dependencies Spring Cloud AWS Dependencies - 3.2.0-SNAPSHOT + 3.2.0 pom 2.31.0 - 2.25.21 + 2.25.70 2.0.4 3.0.1 1.6 - 4.1.0 + 4.1.4 2.1.0 1.0.0 1.14.9 @@ -142,6 +142,12 @@ ${project.version} + + io.awspring.cloud + spring-cloud-aws-docker-compose + ${project.version} + + io.awspring.cloud spring-cloud-aws-starter diff --git a/spring-cloud-aws-docker-compose/pom.xml b/spring-cloud-aws-docker-compose/pom.xml new file mode 100644 index 000000000..680c3ed26 --- /dev/null +++ b/spring-cloud-aws-docker-compose/pom.xml @@ -0,0 +1,33 @@ + + + + + spring-cloud-aws + io.awspring.cloud + 3.2.0 + + 4.0.0 + + spring-cloud-aws-docker-compose + Spring Cloud AWS Docker Compose + Spring Cloud AWS Docker Compose Integration + + + + io.awspring.cloud + spring-cloud-aws-autoconfigure + + + org.springframework.boot + spring-boot-docker-compose + + + org.springframework + spring-test + test + + + + diff --git a/spring-cloud-aws-docker-compose/src/main/java/io/awspring/cloud/docker/compose/AwsDockerComposeConnectionDetailsFactory.java b/spring-cloud-aws-docker-compose/src/main/java/io/awspring/cloud/docker/compose/AwsDockerComposeConnectionDetailsFactory.java new file mode 100644 index 000000000..94e524fac --- /dev/null +++ b/spring-cloud-aws-docker-compose/src/main/java/io/awspring/cloud/docker/compose/AwsDockerComposeConnectionDetailsFactory.java @@ -0,0 +1,82 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.docker.compose; + +import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; +import java.net.URI; +import org.springframework.boot.docker.compose.core.RunningService; +import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionDetailsFactory; +import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionSource; + +/** + * {@link DockerComposeConnectionDetailsFactory} to create {@link AwsConnectionDetails} for a {@code localstack} + * service. + * + * @author Dominik Kovács + * @since 3.2.0 + */ +class AwsDockerComposeConnectionDetailsFactory extends DockerComposeConnectionDetailsFactory { + + private static final String[] LOCALSTACK_CONTAINER_NAMES = { "localstack/localstack", "localstack/localstack-pro" }; + + private static final int LOCALSTACK_PORT = 4566; + + AwsDockerComposeConnectionDetailsFactory() { + super(LOCALSTACK_CONTAINER_NAMES); + } + + @Override + protected AwsConnectionDetails getDockerComposeConnectionDetails(DockerComposeConnectionSource source) { + return new AwsDockerComposeConnectionDetails(source.getRunningService()); + } + + /** + * {@link DockerComposeConnectionDetails} backed by a {@code localstack} {@link RunningService}. + */ + private static final class AwsDockerComposeConnectionDetails extends DockerComposeConnectionDetails + implements AwsConnectionDetails { + + private final LocalStackEnvironment environment; + + private final URI endpoint; + + private AwsDockerComposeConnectionDetails(RunningService service) { + super(service); + this.environment = new LocalStackEnvironment(service.env()); + this.endpoint = URI.create("http://%s:%s".formatted(service.host(), service.ports().get(LOCALSTACK_PORT))); + } + + @Override + public URI getEndpoint() { + return this.endpoint; + } + + @Override + public String getRegion() { + return this.environment.getRegion(); + } + + @Override + public String getAccessKey() { + return this.environment.getAccessKey(); + } + + @Override + public String getSecretKey() { + return this.environment.getSecretKey(); + } + } +} diff --git a/spring-cloud-aws-docker-compose/src/main/java/io/awspring/cloud/docker/compose/LocalStackEnvironment.java b/spring-cloud-aws-docker-compose/src/main/java/io/awspring/cloud/docker/compose/LocalStackEnvironment.java new file mode 100644 index 000000000..ee1c9f708 --- /dev/null +++ b/spring-cloud-aws-docker-compose/src/main/java/io/awspring/cloud/docker/compose/LocalStackEnvironment.java @@ -0,0 +1,51 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.docker.compose; + +import java.util.Map; + +/** + * LocalStack environment details. + * + * @author Dominik Kovács + * @since 3.2.0 + */ +class LocalStackEnvironment { + + private final String region; + + private final String accessKey; + + private final String secretKey; + + LocalStackEnvironment(Map env) { + this.region = env.get("AWS_DEFAULT_REGION"); + this.accessKey = env.get("AWS_ACCESS_KEY_ID"); + this.secretKey = env.get("AWS_SECRET_ACCESS_KEY"); + } + + String getRegion() { + return this.region; + } + + String getAccessKey() { + return this.accessKey; + } + + String getSecretKey() { + return this.secretKey; + } +} diff --git a/spring-cloud-aws-docker-compose/src/main/java/io/awspring/cloud/docker/compose/package-info.java b/spring-cloud-aws-docker-compose/src/main/java/io/awspring/cloud/docker/compose/package-info.java new file mode 100644 index 000000000..a006addab --- /dev/null +++ b/spring-cloud-aws-docker-compose/src/main/java/io/awspring/cloud/docker/compose/package-info.java @@ -0,0 +1,6 @@ +/** + * Docker Compose integration. + */ +@org.springframework.lang.NonNullApi +@org.springframework.lang.NonNullFields +package io.awspring.cloud.docker.compose; diff --git a/spring-cloud-aws-docker-compose/src/main/resources/META-INF/spring.factories b/spring-cloud-aws-docker-compose/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..4c74052b9 --- /dev/null +++ b/spring-cloud-aws-docker-compose/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.service.connection.ConnectionDetailsFactory=\ +io.awspring.cloud.docker.compose.AwsDockerComposeConnectionDetailsFactory diff --git a/spring-cloud-aws-docker-compose/src/test/java/io/awspring/cloud/docker/compose/AwsDockerComposeConnectionDetailsFactoryTest.java b/spring-cloud-aws-docker-compose/src/test/java/io/awspring/cloud/docker/compose/AwsDockerComposeConnectionDetailsFactoryTest.java new file mode 100644 index 000000000..99e2e5610 --- /dev/null +++ b/spring-cloud-aws-docker-compose/src/test/java/io/awspring/cloud/docker/compose/AwsDockerComposeConnectionDetailsFactoryTest.java @@ -0,0 +1,50 @@ +package io.awspring.cloud.docker.compose; + +import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; + +import java.io.IOException; +import java.net.URI; +import java.util.LinkedHashMap; + +import static org.assertj.core.api.Assertions.assertThat; + +class AwsDockerComposeConnectionDetailsFactoryTest { + + private final Resource dockerComposeResource = new ClassPathResource("docker-compose.yaml"); + + @AfterAll + static void shutDown() { + var shutdownHandlers = SpringApplication.getShutdownHandlers(); + ((Runnable) shutdownHandlers).run(); + } + + @Test + void runCreatesConnectionDetailsThatCanAccessLocalStack() throws IOException { + var application = new SpringApplication(Config.class); + var properties = new LinkedHashMap(); + properties.put("spring.docker.compose.skip.in-tests", "false"); + properties.put("spring.docker.compose.file", dockerComposeResource.getFile()); + properties.put("spring.docker.compose.stop.command", "down"); + application.setDefaultProperties(properties); + var connectionDetails = application.run().getBean(AwsConnectionDetails.class); + + assertThat(connectionDetails.getAccessKey()).isEqualTo("noop"); + assertThat(connectionDetails.getSecretKey()).isEqualTo("noop"); + assertThat(connectionDetails.getRegion()).isEqualTo("eu-central-1"); + assertThat(connectionDetails.getEndpoint()).satisfiesAnyOf( + endpoint -> assertThat(endpoint).isEqualTo(URI.create("http://localhost:4566")), + endpoint -> assertThat(endpoint).isEqualTo(URI.create("http://127.0.0.1:4566"))); + } + + @Configuration(proxyBeanMethods = false) + static class Config { + + } +} diff --git a/spring-cloud-aws-docker-compose/src/test/java/io/awspring/cloud/docker/compose/LocalStackEnvironmentTest.java b/spring-cloud-aws-docker-compose/src/test/java/io/awspring/cloud/docker/compose/LocalStackEnvironmentTest.java new file mode 100644 index 000000000..3b4a928a5 --- /dev/null +++ b/spring-cloud-aws-docker-compose/src/test/java/io/awspring/cloud/docker/compose/LocalStackEnvironmentTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.docker.compose; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.util.Collections; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class LocalStackEnvironmentTest { + + @Test + void getRegionWhenRegionIsNotSet() { + var environment = new LocalStackEnvironment(Collections.emptyMap()); + assertThat(environment.getRegion()).isNull(); + } + + @Test + void getRegionWhenRegionIsSet() { + var environment = new LocalStackEnvironment(Map.of("AWS_DEFAULT_REGION", "us-west-1")); + assertThat(environment.getRegion()).isEqualTo("us-west-1"); + } + + @Test + void getAccessKeyWhenAccessKeyIsNotSet() { + var environment = new LocalStackEnvironment(Collections.emptyMap()); + assertThat(environment.getAccessKey()).isNull(); + } + + @Test + void getAccessKeyWhenAccessKeyIsSet() { + var environment = new LocalStackEnvironment(Map.of("AWS_ACCESS_KEY_ID", "access-key")); + assertThat(environment.getAccessKey()).isEqualTo("access-key"); + } + + @Test + void getSecretKeyWhenSecretKeyIsNotSet() { + var environment = new LocalStackEnvironment(Collections.emptyMap()); + assertThat(environment.getSecretKey()).isNull(); + } + + @Test + void getSecretKeyWhenSecretKeyIsSet() { + var environment = new LocalStackEnvironment(Map.of("AWS_SECRET_ACCESS_KEY", "secret-key")); + assertThat(environment.getSecretKey()).isEqualTo("secret-key"); + } +} diff --git a/spring-cloud-aws-docker-compose/src/test/resources/docker-compose.yaml b/spring-cloud-aws-docker-compose/src/test/resources/docker-compose.yaml new file mode 100644 index 000000000..f10dc6ab3 --- /dev/null +++ b/spring-cloud-aws-docker-compose/src/test/resources/docker-compose.yaml @@ -0,0 +1,9 @@ +services: + localstack: + image: localstack/localstack + environment: + AWS_ACCESS_KEY_ID: noop + AWS_SECRET_ACCESS_KEY: noop + AWS_DEFAULT_REGION: eu-central-1 + ports: + - "4566:4566" diff --git a/spring-cloud-aws-dynamodb/pom.xml b/spring-cloud-aws-dynamodb/pom.xml index 24613ce29..7064aae2a 100644 --- a/spring-cloud-aws-dynamodb/pom.xml +++ b/spring-cloud-aws-dynamodb/pom.xml @@ -5,7 +5,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 spring-cloud-aws-dynamodb diff --git a/spring-cloud-aws-modulith/pom.xml b/spring-cloud-aws-modulith/pom.xml index a88325060..884a4c67a 100644 --- a/spring-cloud-aws-modulith/pom.xml +++ b/spring-cloud-aws-modulith/pom.xml @@ -5,7 +5,7 @@ io.awspring.cloud spring-cloud-aws - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 diff --git a/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sns/pom.xml b/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sns/pom.xml index 568e60210..a4172bfd1 100644 --- a/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sns/pom.xml +++ b/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sns/pom.xml @@ -5,7 +5,7 @@ io.awspring.cloud spring-cloud-aws-modulith - 3.2.0-SNAPSHOT + 3.2.0 Spring Cloud AWS Modulith - Events - SNS support diff --git a/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sqs/pom.xml b/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sqs/pom.xml index 553af4382..6f750a8f5 100644 --- a/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sqs/pom.xml +++ b/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sqs/pom.xml @@ -5,7 +5,7 @@ io.awspring.cloud spring-cloud-aws-modulith - 3.2.0-SNAPSHOT + 3.2.0 Spring Cloud AWS Modulith - Events - SQS support diff --git a/spring-cloud-aws-parameter-store/pom.xml b/spring-cloud-aws-parameter-store/pom.xml index 37302f44d..4ebf051d1 100644 --- a/spring-cloud-aws-parameter-store/pom.xml +++ b/spring-cloud-aws-parameter-store/pom.xml @@ -5,7 +5,7 @@ io.awspring.cloud spring-cloud-aws - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 diff --git a/spring-cloud-aws-s3/pom.xml b/spring-cloud-aws-s3/pom.xml index 561e6f8cf..9e3fd5966 100644 --- a/spring-cloud-aws-s3/pom.xml +++ b/spring-cloud-aws-s3/pom.xml @@ -7,7 +7,7 @@ io.awspring.cloud spring-cloud-aws - 3.2.0-SNAPSHOT + 3.2.0 spring-cloud-aws-s3 diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/S3Resource.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/S3Resource.java index c125f02d0..42a089913 100644 --- a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/S3Resource.java +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/S3Resource.java @@ -26,6 +26,7 @@ import org.springframework.core.io.WritableResource; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetUrlRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; @@ -82,6 +83,9 @@ public S3Resource(Location location, S3Client s3Client, S3OutputStreamProvider s @Override public URL getURL() throws IOException { + if (!StringUtils.hasText(this.location.getObject())) { + return new URL("https", location.getBucket() + ".s3.amazonaws.com", "/"); + } GetUrlRequest getUrlRequest = GetUrlRequest.builder().bucket(this.getLocation().getBucket()) .key(this.location.getObject()).versionId(this.location.getVersion()).build(); return s3Client.utilities().getUrl(getUrlRequest); diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3ResourceIntegrationTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3ResourceIntegrationTests.java index 84dfb1fa5..fce24e04f 100644 --- a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3ResourceIntegrationTests.java +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3ResourceIntegrationTests.java @@ -152,6 +152,13 @@ void returnsResourceUrl(S3OutputStreamProvider s3OutputStreamProvider) throws IO .isEqualTo("http://127.0.0.1:" + localstack.getFirstMappedPort() + "/first-bucket/a-file.txt"); } + @TestAvailableOutputStreamProviders + void returnsEmptyUrlToBucketWhenObjectIsEmpty(S3OutputStreamProvider s3OutputStreamProvider) throws IOException { + S3Resource resource = s3Resource("s3://first-bucket/", s3OutputStreamProvider); + assertThat(resource.getURL().toString()) + .isEqualTo("https://first-bucket.s3.amazonaws.com/"); + } + @TestAvailableOutputStreamProviders void returnsEncodedResourceUrlAndUri(S3OutputStreamProvider s3OutputStreamProvider) throws IOException, URISyntaxException { diff --git a/spring-cloud-aws-samples/infrastructure/pom.xml b/spring-cloud-aws-samples/infrastructure/pom.xml index 7df4bb468..d05cbb373 100644 --- a/spring-cloud-aws-samples/infrastructure/pom.xml +++ b/spring-cloud-aws-samples/infrastructure/pom.xml @@ -6,7 +6,7 @@ io.awspring.cloud spring-cloud-aws-samples - 3.2.0-SNAPSHOT + 3.2.0 spring-cloud-aws-samples-infrastructure diff --git a/spring-cloud-aws-samples/pom.xml b/spring-cloud-aws-samples/pom.xml index 20bbb3ae7..2f9a8c5e5 100644 --- a/spring-cloud-aws-samples/pom.xml +++ b/spring-cloud-aws-samples/pom.xml @@ -5,7 +5,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 @@ -15,6 +15,7 @@ infrastructure + spring-cloud-aws-docker-compose-sample spring-cloud-aws-dynamodb-sample spring-cloud-aws-parameter-store-sample spring-cloud-aws-s3-sample diff --git a/spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/docker-compose.yaml b/spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/docker-compose.yaml new file mode 100644 index 000000000..167c27580 --- /dev/null +++ b/spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/docker-compose.yaml @@ -0,0 +1,11 @@ +services: + localstack: + image: localstack/localstack + container_name: localstack + environment: + SERVICES: sqs + AWS_ACCESS_KEY_ID: noop + AWS_SECRET_ACCESS_KEY: noop + AWS_DEFAULT_REGION: eu-central-1 + ports: + - "4566:4566" diff --git a/spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/pom.xml b/spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/pom.xml new file mode 100644 index 000000000..6b7781ff7 --- /dev/null +++ b/spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/pom.xml @@ -0,0 +1,34 @@ + + + + spring-cloud-aws-samples + io.awspring.cloud + 3.2.0 + + 4.0.0 + + spring-cloud-aws-docker-compose-sample + Spring Cloud AWS Docker Compose Sample + + + + org.springframework.boot + spring-boot-starter + + + + io.awspring.cloud + spring-cloud-aws-starter-sqs + + + + io.awspring.cloud + spring-cloud-aws-docker-compose + runtime + true + + + + diff --git a/spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/src/main/java/io/awspring/cloud/samples/docker/compose/Application.java b/spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/src/main/java/io/awspring/cloud/samples/docker/compose/Application.java new file mode 100644 index 000000000..4137cf58f --- /dev/null +++ b/spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/src/main/java/io/awspring/cloud/samples/docker/compose/Application.java @@ -0,0 +1,45 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.samples.docker.compose; + +import io.awspring.cloud.sqs.annotation.SqsListener; +import io.awspring.cloud.sqs.operations.SqsTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class Application { + + private static final Logger log = LoggerFactory.getLogger(Application.class); + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + @Bean + ApplicationRunner applicationRunner(SqsTemplate sqsTemplate) { + return args -> sqsTemplate.send("test-queue", "Hello World"); + } + + @SqsListener("test-queue") + public void receiveMessage(String message) { + log.info("Received message: {}", message); + } +} diff --git a/spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/src/main/resources/application.properties b/spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/src/main/resources/application.properties new file mode 100644 index 000000000..83c5d7aad --- /dev/null +++ b/spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/src/main/resources/application.properties @@ -0,0 +1,6 @@ +spring.application.name=spring-cloud-aws-docker-compose-sample + +# Notice we don't need to configure spring.cloud.aws.* properties (endpoint, region, access keys), +# since this is configured in the docker-compose.yaml file and automatically picked up by the application + +spring.docker.compose.file=spring-cloud-aws-samples/spring-cloud-aws-docker-compose-sample/docker-compose.yaml diff --git a/spring-cloud-aws-samples/spring-cloud-aws-dynamodb-sample/pom.xml b/spring-cloud-aws-samples/spring-cloud-aws-dynamodb-sample/pom.xml index da1f38097..5eec50e12 100644 --- a/spring-cloud-aws-samples/spring-cloud-aws-dynamodb-sample/pom.xml +++ b/spring-cloud-aws-samples/spring-cloud-aws-dynamodb-sample/pom.xml @@ -5,7 +5,7 @@ spring-cloud-aws-samples io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 diff --git a/spring-cloud-aws-samples/spring-cloud-aws-parameter-store-sample/pom.xml b/spring-cloud-aws-samples/spring-cloud-aws-parameter-store-sample/pom.xml index e9450d494..5f7d2e310 100644 --- a/spring-cloud-aws-samples/spring-cloud-aws-parameter-store-sample/pom.xml +++ b/spring-cloud-aws-samples/spring-cloud-aws-parameter-store-sample/pom.xml @@ -5,7 +5,7 @@ io.awspring.cloud spring-cloud-aws-samples - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 diff --git a/spring-cloud-aws-samples/spring-cloud-aws-s3-sample/pom.xml b/spring-cloud-aws-samples/spring-cloud-aws-s3-sample/pom.xml index 21b92d449..cedf90ebe 100644 --- a/spring-cloud-aws-samples/spring-cloud-aws-s3-sample/pom.xml +++ b/spring-cloud-aws-samples/spring-cloud-aws-s3-sample/pom.xml @@ -5,7 +5,7 @@ io.awspring.cloud spring-cloud-aws-samples - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 diff --git a/spring-cloud-aws-samples/spring-cloud-aws-secrets-manager-sample/pom.xml b/spring-cloud-aws-samples/spring-cloud-aws-secrets-manager-sample/pom.xml index 8d2f923b6..b66ff9084 100644 --- a/spring-cloud-aws-samples/spring-cloud-aws-secrets-manager-sample/pom.xml +++ b/spring-cloud-aws-samples/spring-cloud-aws-secrets-manager-sample/pom.xml @@ -5,7 +5,7 @@ io.awspring.cloud spring-cloud-aws-samples - 3.2.0-SNAPSHOT + 3.2.0 spring-cloud-aws-secrets-manager-sample diff --git a/spring-cloud-aws-samples/spring-cloud-aws-ses-sample/pom.xml b/spring-cloud-aws-samples/spring-cloud-aws-ses-sample/pom.xml index 10951220e..cb7ae0d5a 100644 --- a/spring-cloud-aws-samples/spring-cloud-aws-ses-sample/pom.xml +++ b/spring-cloud-aws-samples/spring-cloud-aws-ses-sample/pom.xml @@ -5,7 +5,7 @@ io.awspring.cloud spring-cloud-aws-samples - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 diff --git a/spring-cloud-aws-samples/spring-cloud-aws-sns-sample/pom.xml b/spring-cloud-aws-samples/spring-cloud-aws-sns-sample/pom.xml index d13150453..d0a3e0952 100644 --- a/spring-cloud-aws-samples/spring-cloud-aws-sns-sample/pom.xml +++ b/spring-cloud-aws-samples/spring-cloud-aws-sns-sample/pom.xml @@ -5,7 +5,7 @@ spring-cloud-aws-samples io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 spring-cloud-aws-sns-sample diff --git a/spring-cloud-aws-samples/spring-cloud-aws-sqs-sample/pom.xml b/spring-cloud-aws-samples/spring-cloud-aws-sqs-sample/pom.xml index 7069acc53..a9df1171c 100644 --- a/spring-cloud-aws-samples/spring-cloud-aws-sqs-sample/pom.xml +++ b/spring-cloud-aws-samples/spring-cloud-aws-sqs-sample/pom.xml @@ -5,7 +5,7 @@ spring-cloud-aws-samples io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 spring-cloud-aws-sqs-sample diff --git a/spring-cloud-aws-secrets-manager/pom.xml b/spring-cloud-aws-secrets-manager/pom.xml index a7bbf9aed..abc28cc1b 100644 --- a/spring-cloud-aws-secrets-manager/pom.xml +++ b/spring-cloud-aws-secrets-manager/pom.xml @@ -5,7 +5,7 @@ io.awspring.cloud spring-cloud-aws - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 diff --git a/spring-cloud-aws-ses/pom.xml b/spring-cloud-aws-ses/pom.xml index 16924aef5..51e43baf5 100644 --- a/spring-cloud-aws-ses/pom.xml +++ b/spring-cloud-aws-ses/pom.xml @@ -5,7 +5,7 @@ io.awspring.cloud spring-cloud-aws - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 diff --git a/spring-cloud-aws-ses/src/main/java/io/awspring/cloud/ses/SimpleEmailServiceMailSender.java b/spring-cloud-aws-ses/src/main/java/io/awspring/cloud/ses/SimpleEmailServiceMailSender.java index c757d575a..a89537199 100644 --- a/spring-cloud-aws-ses/src/main/java/io/awspring/cloud/ses/SimpleEmailServiceMailSender.java +++ b/spring-cloud-aws-ses/src/main/java/io/awspring/cloud/ses/SimpleEmailServiceMailSender.java @@ -125,7 +125,9 @@ protected String getConfigurationSetName() { private SendEmailRequest prepareMessage(SimpleMailMessage simpleMailMessage) { Assert.notNull(simpleMailMessage, "simpleMailMessage are required"); Destination.Builder destinationBuilder = Destination.builder(); - destinationBuilder.toAddresses(simpleMailMessage.getTo()); + if (simpleMailMessage.getTo() != null) { + destinationBuilder.toAddresses(simpleMailMessage.getTo()); + } if (simpleMailMessage.getCc() != null) { destinationBuilder.ccAddresses(simpleMailMessage.getCc()); diff --git a/spring-cloud-aws-ses/src/test/java/io/awspring/cloud/ses/SimpleEmailServiceMailSenderTest.java b/spring-cloud-aws-ses/src/test/java/io/awspring/cloud/ses/SimpleEmailServiceMailSenderTest.java index b0fe71bac..a51c34b0c 100644 --- a/spring-cloud-aws-ses/src/test/java/io/awspring/cloud/ses/SimpleEmailServiceMailSenderTest.java +++ b/spring-cloud-aws-ses/src/test/java/io/awspring/cloud/ses/SimpleEmailServiceMailSenderTest.java @@ -110,6 +110,32 @@ void testSendSimpleMailWithCCandBCC() { .isEqualTo(Objects.requireNonNull(simpleMailMessage.getBcc())[0]); } + @Test + void testSendSimpleMailWithNoTo() { + SesClient emailService = mock(SesClient.class); + SimpleEmailServiceMailSender mailSender = new SimpleEmailServiceMailSender(emailService); + + // Not using createSimpleMailMessage as we don't want the to address set. + SimpleMailMessage simpleMailMessage = new SimpleMailMessage(); + simpleMailMessage.setFrom("sender@domain.com"); + simpleMailMessage.setSubject("message subject"); + simpleMailMessage.setText("message body"); + + simpleMailMessage.setBcc("bcc@domain.com"); + + ArgumentCaptor request = ArgumentCaptor.forClass(SendEmailRequest.class); + when(emailService.sendEmail(request.capture())) + .thenReturn(SendEmailResponse.builder().messageId("123").build()); + + mailSender.send(simpleMailMessage); + + SendEmailRequest sendEmailRequest = request.getValue(); + assertThat(sendEmailRequest.message().subject().data()).isEqualTo(simpleMailMessage.getSubject()); + assertThat(sendEmailRequest.message().body().text().data()).isEqualTo(simpleMailMessage.getText()); + assertThat(sendEmailRequest.destination().bccAddresses().get(0)) + .isEqualTo(Objects.requireNonNull(simpleMailMessage.getBcc())[0]); + } + @Test void testSendMultipleMails() { SesClient emailService = mock(SesClient.class); diff --git a/spring-cloud-aws-sns/pom.xml b/spring-cloud-aws-sns/pom.xml index 15bdda47f..cbec0264e 100644 --- a/spring-cloud-aws-sns/pom.xml +++ b/spring-cloud-aws-sns/pom.xml @@ -6,7 +6,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 diff --git a/spring-cloud-aws-sqs/pom.xml b/spring-cloud-aws-sqs/pom.xml index e28cc4f29..bf1ee4775 100644 --- a/spring-cloud-aws-sqs/pom.xml +++ b/spring-cloud-aws-sqs/pom.xml @@ -5,7 +5,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 4.0.0 diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/CollectionUtils.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/CollectionUtils.java new file mode 100644 index 000000000..667baeb48 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/CollectionUtils.java @@ -0,0 +1,38 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class CollectionUtils { + + public static List> partition(Collection messagesToAck, int pageSize) { + List messagesToUse = getAsList(messagesToAck); + int totalSize = messagesToUse.size(); + return IntStream.rangeClosed(0, (totalSize - 1) / pageSize) + .mapToObj(index -> messagesToUse.subList(index * pageSize, Math.min((index + 1) * pageSize, totalSize))) + .collect(Collectors.toList()); + } + + private static List getAsList(Collection elements) { + return elements instanceof List ? (List) elements : new ArrayList<>(elements); + } + +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java index d9ee062b8..9ae4f9f1f 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java @@ -124,9 +124,8 @@ String pollTimeoutSeconds() default ""; /** - * The maximum number of messages to poll from SQS. If a value greater than 10 is provided, the result of - * multiple polls will be combined, which can be useful for - * {@link io.awspring.cloud.sqs.listener.ListenerMode#BATCH} + * The maximum number of messages to poll from SQS. If a value greater than 10 is provided, the result of multiple + * polls will be combined, which can be useful for {@link io.awspring.cloud.sqs.listener.ListenerMode#BATCH} * @return the maximum messages per poll. */ String maxMessagesPerPoll() default ""; diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java index 47b745828..36345d4a3 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java @@ -20,6 +20,7 @@ import io.awspring.cloud.sqs.config.SqsBeanNames; import io.awspring.cloud.sqs.config.SqsEndpoint; import io.awspring.cloud.sqs.listener.SqsHeaders; +import io.awspring.cloud.sqs.support.resolver.BatchVisibilityHandlerMethodArgumentResolver; import io.awspring.cloud.sqs.support.resolver.NotificationMessageArgumentResolver; import io.awspring.cloud.sqs.support.resolver.QueueAttributesMethodArgumentResolver; import io.awspring.cloud.sqs.support.resolver.SqsMessageMethodArgumentResolver; @@ -73,7 +74,8 @@ protected String getMessageListenerContainerRegistryBeanName() { @Override protected Collection createAdditionalArgumentResolvers() { return Arrays.asList(new VisibilityHandlerMethodArgumentResolver(SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER), - new SqsMessageMethodArgumentResolver(), new QueueAttributesMethodArgumentResolver()); + new BatchVisibilityHandlerMethodArgumentResolver(SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER), new SqsMessageMethodArgumentResolver(), + new QueueAttributesMethodArgumentResolver()); } @Override diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractEndpoint.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractEndpoint.java index dbb273c75..8194b535b 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractEndpoint.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractEndpoint.java @@ -16,6 +16,7 @@ package io.awspring.cloud.sqs.config; import io.awspring.cloud.sqs.listener.AsyncMessageListener; +import io.awspring.cloud.sqs.listener.BatchVisibility; import io.awspring.cloud.sqs.listener.ListenerMode; import io.awspring.cloud.sqs.listener.MessageListener; import io.awspring.cloud.sqs.listener.MessageListenerContainer; @@ -29,6 +30,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.springframework.core.BridgeMethodResolver; import org.springframework.core.MethodParameter; import org.springframework.lang.Nullable; @@ -112,28 +114,33 @@ public void configureListenerMode(Consumer consumer) { List parameters = getMethodParameters(); boolean batch = hasParameterOfType(parameters, List.class); boolean batchAckParameter = hasParameterOfType(parameters, BatchAcknowledgement.class); - Assert.isTrue(hasValidParameters(batch, batchAckParameter, parameters.size()), getInvalidParametersMessage()); + boolean batchVisibilityParameter = hasParameterOfType(parameters, BatchVisibility.class); + Assert.isTrue(hasValidParameters(batch, batchAckParameter, batchVisibilityParameter, parameters.size()), + getInvalidParametersMessage()); consumer.accept(batch ? ListenerMode.BATCH : ListenerMode.SINGLE_MESSAGE); } - private boolean hasValidParameters(boolean batch, boolean batchAckParameter, int size) { - return hasValidSingleMessageParameters(batch, batchAckParameter) - || hasValidBatchParameters(batchAckParameter, size); + private boolean hasValidParameters(boolean batch, boolean batchAckParameter, boolean batchVisibilityParameter, + int size) { + return hasValidSingleMessageParameters(batch, batchAckParameter, batchVisibilityParameter) + || hasValidBatchParameters(batchAckParameter, batchVisibilityParameter, size); } - private boolean hasValidSingleMessageParameters(boolean batch, boolean batchAckParameter) { - return !batch && !batchAckParameter; + private boolean hasValidSingleMessageParameters(boolean batch, boolean batchAckParameter, + boolean batchVisibilityParameter) { + return !batch && !batchAckParameter && !batchVisibilityParameter; } - private boolean hasValidBatchParameters(boolean batchAckParameter, int size) { - return size == 1 || (size == 2 && batchAckParameter); + private boolean hasValidBatchParameters(boolean batchAckParameter, boolean batchVisibilityParameter, int size) { + long expectedAdditionalParams = Stream.of(batchAckParameter, batchVisibilityParameter).filter(b -> b).count(); + return size == expectedAdditionalParams + 1; } private String getInvalidParametersMessage() { return String.format( "Method %s from class %s in endpoint %s has invalid parameters for batch processing. " + "Batch methods must have a single List parameter, either of Message or T types," - + "and optionally a BatchAcknowledgement or AsyncAcknowledgement parameter.", + + "and optionally BatchAcknowledgement and BatchVisibility parameters.", this.method.getName(), this.method.getDeclaringClass(), this.id); } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchVisibility.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchVisibility.java new file mode 100644 index 000000000..56594b049 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchVisibility.java @@ -0,0 +1,46 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.listener; + +import java.util.concurrent.CompletableFuture; + +/** + * BatchVisibility interface that can be injected as parameter into a listener method. The purpose of this interface is + * to provide a way for the listener methods to extend the visibility timeout of the messages being currently processed. + * + * @author Clement Denis + * @author Tomaz Fernandes + * @since 3.3 + */ +public interface BatchVisibility { + + /** + * Asynchronously changes the provided messages visibility to the provided value. + * @param seconds number of seconds to set the visibility of the provided messages to. + * seconds to set the visibility of provided messages to. + * @return a completable future. + */ + CompletableFuture changeToAsync(int seconds); + + /** + * Changes the provided messages visibility to the provided value. + * @param seconds number of seconds to set the visibility of the provided messages to. + */ + default void changeTo(int seconds) { + changeToAsync(seconds).join(); + } + +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoBatchGroupingStrategy.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoBatchGroupingStrategy.java new file mode 100644 index 000000000..69c4a4a14 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoBatchGroupingStrategy.java @@ -0,0 +1,39 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.listener; + +/** + * Grouping strategy for Fifo SQS with batch listener mode. + * + * @author Alexis SEGURA + * @since 3.1.2 + */ +public enum FifoBatchGroupingStrategy { + + /** + * Default strategy. Group messages in batches by message group. Each batch contains messages from a single message group. + * The order of messages within the group is preserved. As message groups are processed in parallel, this strategy + * provides the maximal throughput. + */ + PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES, + + /** + * Each batch contains messages originating from multiple message groups. The order of messages within each group is + * preserved. Note that FIFO queues do not serve new messages from a group until all the messages for that group in + * the previous batch have been acknowledged or their visibility expired. + */ + PROCESS_MULTIPLE_GROUPS_IN_SAME_BATCH +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactory.java index f3d5eb75e..bd97c255a 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactory.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactory.java @@ -70,9 +70,16 @@ public MessageSource createMessageSource(SqsContainerOptions options) { @Override public MessageSink createMessageSink(SqsContainerOptions options) { MessageSink deliverySink = createDeliverySink(options.getListenerMode()); - return new MessageGroupingSinkAdapter<>( - maybeWrapWithVisibilityAdapter(deliverySink, options.getMessageVisibility()), - getMessageGroupingFunction()); + MessageSink wrappedDeliverySink = maybeWrapWithVisibilityAdapter(deliverySink, + options.getMessageVisibility()); + return maybeWrapWithMessageGroupingAdapter(options, wrappedDeliverySink); + } + + private MessageSink maybeWrapWithMessageGroupingAdapter(SqsContainerOptions options, MessageSink wrappedDeliverySink) { + return FifoBatchGroupingStrategy.PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES + .equals(options.getFifoBatchGroupingStrategy()) + ? new MessageGroupingSinkAdapter<>(wrappedDeliverySink, getMessageGroupingFunction()) + : wrappedDeliverySink; } // @formatter:off diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/QueueMessageBatchVisibility.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/QueueMessageBatchVisibility.java new file mode 100644 index 000000000..df5556f54 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/QueueMessageBatchVisibility.java @@ -0,0 +1,84 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.listener; + +import io.awspring.cloud.sqs.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse; + +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * {@link BatchVisibility} implementation for SQS messages. + * + * @author Clement Denis + * @author Tomaz Fernandes + * @since 3.3 + */ +public class QueueMessageBatchVisibility implements BatchVisibility { + + private static final Logger logger = LoggerFactory.getLogger(QueueMessageVisibility.class); + + private final SqsAsyncClient sqsAsyncClient; + + private final String queueUrl; + + private final Collection receiptHandles; + + /** + * Create an instance for changing the visibility in batch for the provided queue. + */ + public QueueMessageBatchVisibility(SqsAsyncClient sqsAsyncClient, String queueUrl, Collection receiptHandles) { + this.sqsAsyncClient = sqsAsyncClient; + this.queueUrl = queueUrl; + this.receiptHandles = receiptHandles; + } + + @Override + public CompletableFuture changeToAsync(int seconds) { + return changeToAsyncBatch(seconds); + } + + private CompletableFuture changeToAsyncBatch(int seconds) { + return CompletableFuture.allOf(CollectionUtils.partition(receiptHandles, 10) + .stream() + .map(batch -> doChangeVisibility(seconds, batch) + .thenRun(() -> logger.trace("Changed the visibility of {} message to {} seconds", batch.size(), + seconds))).toArray(CompletableFuture[]::new)); + } + + private CompletableFuture doChangeVisibility(int seconds, Collection batch) { + return sqsAsyncClient.changeMessageVisibilityBatch(req -> req.queueUrl(queueUrl) + .entries(createEntries(seconds, batch))); + } + + private List createEntries(int seconds, Collection batch) { + return batch.stream() + .map(handle -> ChangeMessageVisibilityBatchRequestEntry.builder() + .receiptHandle(handle) + .id(UUID.randomUUID().toString()) + .visibilityTimeout(seconds) + .build()) + .toList(); + } + +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/QueueMessageVisibility.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/QueueMessageVisibility.java index 768759943..c6a704cf1 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/QueueMessageVisibility.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/QueueMessageVisibility.java @@ -15,11 +15,16 @@ */ package io.awspring.cloud.sqs.listener; -import java.util.concurrent.CompletableFuture; +import io.awspring.cloud.sqs.MessageHeaderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.messaging.Message; import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + /** * {@link Visibility} implementation for SQS messages. * @@ -49,6 +54,17 @@ public QueueMessageVisibility(SqsAsyncClient amazonSqsAsync, String queueUrl, St this.receiptHandle = receiptHandle; } + /** + * Create a {@link BatchVisibility} instance with the provided messages' receipt handlers. + * @param messages the messages to populate the {@link BatchVisibility} instance. + * @return {@link BatchVisibility} instance. + */ + public BatchVisibility toBatchVisibility(Collection> messages) { + return new QueueMessageBatchVisibility(this.sqsAsyncClient, this.queueUrl, messages.stream() + .map(message -> MessageHeaderUtils.getHeader(message, SqsHeaders.SQS_RECEIPT_HANDLE_HEADER, String.class)) + .collect(Collectors.toList())); + } + @Override public CompletableFuture changeToAsync(int seconds) { return this.sqsAsyncClient @@ -57,4 +73,5 @@ public CompletableFuture changeToAsync(int seconds) { .thenRun(() -> logger.trace("Changed the visibility of message {} to {} seconds", this.receiptHandle, seconds)); } + } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptions.java index a712ab646..b547d906c 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptions.java @@ -38,6 +38,8 @@ public class SqsContainerOptions extends AbstractContainerOptions queueAttributeNames; private final Collection messageAttributeNames; @@ -57,6 +59,7 @@ protected SqsContainerOptions(BuilderImpl builder) { this.messageSystemAttributeNames = builder.messageSystemAttributeNames; this.messageVisibility = builder.messageVisibility; this.queueNotFoundStrategy = builder.queueNotFoundStrategy; + this.fifoBatchGroupingStrategy = builder.fifoBatchGroupingStrategy; } /** @@ -100,6 +103,15 @@ public Duration getMessageVisibility() { return this.messageVisibility; } + /** + * Get messages grouping strategy in FIFO queues when retrieved by the container in listener mode + * {@link ListenerMode#BATCH}. + * @return the fifo batch message grouping strategy. + */ + public FifoBatchGroupingStrategy getFifoBatchGroupingStrategy() { + return this.fifoBatchGroupingStrategy; + } + /** * Get the {@link QueueNotFoundStrategy} for the container. * @return the strategy. @@ -135,6 +147,8 @@ private static class BuilderImpl private QueueNotFoundStrategy queueNotFoundStrategy = DEFAULT_QUEUE_NOT_FOUND_STRATEGY; + private FifoBatchGroupingStrategy fifoBatchGroupingStrategy = FifoBatchGroupingStrategy.PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES; + @Nullable private Duration messageVisibility; @@ -148,6 +162,7 @@ protected BuilderImpl(SqsContainerOptions options) { this.messageAttributeNames = options.messageAttributeNames; this.messageSystemAttributeNames = options.messageSystemAttributeNames; this.messageVisibility = options.messageVisibility; + this.fifoBatchGroupingStrategy = options.fifoBatchGroupingStrategy; this.queueNotFoundStrategy = options.queueNotFoundStrategy; } @@ -181,6 +196,14 @@ public SqsContainerOptionsBuilder messageVisibility(Duration messageVisibility) return this; } + @Override + public SqsContainerOptionsBuilder fifoBatchGroupingStrategy( + FifoBatchGroupingStrategy fifoBatchGroupingStrategy) { + Assert.notNull(fifoBatchGroupingStrategy, "fifoBatchGroupingStrategy cannot be null"); + this.fifoBatchGroupingStrategy = fifoBatchGroupingStrategy; + return this; + } + @Override public SqsContainerOptionsBuilder queueNotFoundStrategy(QueueNotFoundStrategy queueNotFoundStrategy) { Assert.notNull(queueNotFoundStrategy, "queueNotFoundStrategy cannot be null"); diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptionsBuilder.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptionsBuilder.java index f52f562e3..df1edce7f 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptionsBuilder.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptionsBuilder.java @@ -58,6 +58,15 @@ SqsContainerOptionsBuilder messageSystemAttributeNames( */ SqsContainerOptionsBuilder messageVisibility(Duration messageVisibility); + /** + * Set how the messages from FIFO queues should be grouped when container listener mode is + * {@link ListenerMode#BATCH}. By default, messages are grouped in batches by message group, + * which are processed in parallel, maintaining order within each message group. + * @param fifoBatchGroupingStrategy the strategy to batch FIFO messages. + * @return this instance. + */ + SqsContainerOptionsBuilder fifoBatchGroupingStrategy(FifoBatchGroupingStrategy fifoBatchGroupingStrategy); + /** * Set the {@link QueueNotFoundStrategy} for the container. * @param queueNotFoundStrategy the strategy. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/AbstractOrderingAcknowledgementProcessor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/AbstractOrderingAcknowledgementProcessor.java index 0a1ef765c..d61257345 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/AbstractOrderingAcknowledgementProcessor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/AbstractOrderingAcknowledgementProcessor.java @@ -15,10 +15,10 @@ */ package io.awspring.cloud.sqs.listener.acknowledgement; +import io.awspring.cloud.sqs.CollectionUtils; import io.awspring.cloud.sqs.CompletableFutures; import io.awspring.cloud.sqs.MessageHeaderUtils; import io.awspring.cloud.sqs.listener.ContainerOptions; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -29,7 +29,6 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; @@ -286,16 +285,7 @@ private CompletableFuture logAcknowledgementError(Collection> m private Collection>> partitionMessages(Collection> messagesToAck) { logger.trace("Partitioning {} messages in {}", messagesToAck.size(), this.id); - List> messagesToUse = getMessagesAsList(messagesToAck); - int totalSize = messagesToUse.size(); - return IntStream.rangeClosed(0, (totalSize - 1) / this.maxAcknowledgementsPerBatch) - .mapToObj(index -> messagesToUse.subList(index * this.maxAcknowledgementsPerBatch, - Math.min((index + 1) * this.maxAcknowledgementsPerBatch, totalSize))) - .collect(Collectors.toList()); - } - - private List> getMessagesAsList(Collection> messagesToAck) { - return messagesToAck instanceof List ? (List>) messagesToAck : new ArrayList<>(messagesToAck); + return CollectionUtils.partition(messagesToAck, this.maxAcknowledgementsPerBatch); } protected abstract CompletableFuture doOnAcknowledge(Message message); diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/AbstractMessagingMessageConverter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/AbstractMessagingMessageConverter.java index 9fa4b45fb..1697f1f2d 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/AbstractMessagingMessageConverter.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/AbstractMessagingMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2022 the original author or authors. + * Copyright 2013-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,8 @@ import org.springframework.messaging.converter.CompositeMessageConverter; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.converter.SimpleMessageConverter; +import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -39,6 +41,8 @@ * Base {@link MessagingMessageConverter} implementation. * * @author Tomaz Fernandes + * @author Dongha Kim + * * @since 3.0 * @see SqsHeaderMapper * @see SqsMessageConversionContext @@ -215,14 +219,17 @@ public MessageConversionContext createMessageConversionContext() { public S fromMessagingMessage(Message message, @Nullable MessageConversionContext context) { // We must make sure the message id stays consistent throughout this process MessageHeaders headers = getMessageHeaders(message); - Message convertedMessage = Objects.requireNonNull( - this.payloadMessageConverter.toMessage(message.getPayload(), message.getHeaders()), - () -> "payloadMessageConverter returned null message for message " + message); + Message convertedMessage = convertPayload(message, message.getPayload()); MessageHeaders completeHeaders = MessageHeaderUtils.addHeadersIfAbsent(headers, convertedMessage.getHeaders()); S messageWithHeaders = this.headerMapper.fromHeaders(completeHeaders); return doConvertMessage(messageWithHeaders, convertedMessage.getPayload()); } + private Message convertPayload(Message message, Object payload) { + return Objects.requireNonNull(this.payloadMessageConverter.toMessage(payload, message.getHeaders()), + () -> "payloadMessageConverter returned null message for message " + message); + } + private MessageHeaders getMessageHeaders(Message message) { String typeHeaderName = this.payloadTypeHeaderFunction.apply(message); return typeHeaderName != null @@ -234,11 +241,18 @@ private MessageHeaders getMessageHeaders(Message message) { private CompositeMessageConverter createDefaultCompositeMessageConverter() { List messageConverters = new ArrayList<>(); + messageConverters.add(createClassMatchingMessageConverter()); messageConverters.add(createStringMessageConverter()); messageConverters.add(createDefaultMappingJackson2MessageConverter()); return new CompositeMessageConverter(messageConverters); } + private SimpleClassMatchingMessageConverter createClassMatchingMessageConverter() { + SimpleClassMatchingMessageConverter matchingMessageConverter = new SimpleClassMatchingMessageConverter(); + matchingMessageConverter.setSerializedPayloadClass(String.class); + return matchingMessageConverter; + } + private StringMessageConverter createStringMessageConverter() { StringMessageConverter stringMessageConverter = new StringMessageConverter(); stringMessageConverter.setSerializedPayloadClass(String.class); diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SimpleClassMatchingMessageConverter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SimpleClassMatchingMessageConverter.java new file mode 100644 index 000000000..8fd63a76d --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SimpleClassMatchingMessageConverter.java @@ -0,0 +1,35 @@ +package io.awspring.cloud.sqs.support.converter; + +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.AbstractMessageConverter; +import org.springframework.messaging.converter.SmartMessageConverter; + +/** + * {@link SmartMessageConverter} implementation that returns the payload unchanged if the target class + * for Serialization / Deserialization matches the payload class. + * + * @author Tomaz Fernandes + * @since 3.3 + */ +public class SimpleClassMatchingMessageConverter extends AbstractMessageConverter { + + @Override + protected boolean supports(Class clazz) { + return true; + } + + @Nullable + @Override + protected Object convertFromInternal(Message message, Class targetClass, @Nullable Object conversionHint) { + Object payload = message.getPayload(); + return payload.getClass().isAssignableFrom(targetClass) ? payload : null; + } + + @Nullable + @Override + protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) { + return payload.getClass().isAssignableFrom(getSerializedPayloadClass()) ? payload : null; + } +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SnsMessageConverter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SnsMessageConverter.java index 37f0a0dd9..718c0e65b 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SnsMessageConverter.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SnsMessageConverter.java @@ -17,6 +17,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.List; +import org.springframework.core.GenericTypeResolver; +import org.springframework.core.MethodParameter; +import org.springframework.core.ResolvableType; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -29,6 +35,7 @@ /** * @author Michael Sosa * @author gustavomonarin + * @author Wei Jiang * @since 3.1.1 */ public class SnsMessageConverter implements SmartMessageConverter { @@ -45,10 +52,31 @@ public SnsMessageConverter(MessageConverter payloadConverter, ObjectMapper jsonM } @Override + @SuppressWarnings("unchecked") public Object fromMessage(Message message, Class targetClass, @Nullable Object conversionHint) { Assert.notNull(message, "message must not be null"); Assert.notNull(targetClass, "target class must not be null"); + Object payload = message.getPayload(); + + if (payload instanceof List messages) { + return fromGenericMessages(messages, targetClass, conversionHint); + } + else { + return fromGenericMessage((GenericMessage) message, targetClass, conversionHint); + } + } + + private Object fromGenericMessages(List> messages, Class targetClass, + @Nullable Object conversionHint) { + Type resolvedType = getResolvedType(targetClass, conversionHint); + Class resolvedClazz = ResolvableType.forType(resolvedType).resolve(); + + return messages.stream().map(message -> fromGenericMessage(message, resolvedClazz, resolvedType)).toList(); + } + + private Object fromGenericMessage(GenericMessage message, Class targetClass, + @Nullable Object conversionHint) { JsonNode jsonNode; try { jsonNode = this.jsonMapper.readTree(message.getPayload().toString()); @@ -77,7 +105,7 @@ public Object fromMessage(Message message, Class targetClass, @Nullable Ob ? ((SmartMessageConverter) this.payloadConverter).fromMessage(genericMessage, targetClass, conversionHint) : this.payloadConverter.fromMessage(genericMessage, targetClass); - return new SnsMessageWrapper(jsonNode.path("Subject").asText(), convertedMessage); + return convertedMessage; } @Override @@ -97,10 +125,23 @@ public Message toMessage(Object payload, MessageHeaders headers, Object conve "This converter only supports reading a SNS notification and not writing them"); } - /** - * SNS Message wrapper. - */ - public record SnsMessageWrapper(String subject, Object message) { + private static Type getResolvedType(Class targetClass, @Nullable Object conversionHint) { + if (conversionHint instanceof MethodParameter param) { + param = param.nestedIfOptional(); + if (Message.class.isAssignableFrom(param.getParameterType())) { + param = param.nested(); + } + Type genericParameterType = param.getNestedGenericParameterType(); + Class contextClass = param.getContainingClass(); + Type resolveType = GenericTypeResolver.resolveType(genericParameterType, contextClass); + if (resolveType instanceof ParameterizedType parameterizedType) { + return parameterizedType.getActualTypeArguments()[0]; + } + else { + return resolveType; + } + } + return targetClass; } } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsMessagingMessageConverter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsMessagingMessageConverter.java index 7f6051e8c..67d0879c9 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsMessagingMessageConverter.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsMessagingMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2022 the original author or authors. + * Copyright 2013-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ * {@link software.amazon.awssdk.services.sqs.model.Message} instances to Spring Messaging {@link Message} instances. * * @author Tomaz Fernandes + * @author Dongha kim * @since 3.0 * @see SqsHeaderMapper * @see SqsMessageConversionContext diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/resolver/BatchVisibilityHandlerMethodArgumentResolver.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/resolver/BatchVisibilityHandlerMethodArgumentResolver.java new file mode 100644 index 000000000..75f709062 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/resolver/BatchVisibilityHandlerMethodArgumentResolver.java @@ -0,0 +1,64 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.support.resolver; + +import io.awspring.cloud.sqs.MessageHeaderUtils; +import io.awspring.cloud.sqs.listener.BatchVisibility; +import io.awspring.cloud.sqs.listener.QueueMessageVisibility; +import org.springframework.core.MethodParameter; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; + +import java.util.Collection; + +/** + * {@link HandlerMethodArgumentResolver} for {@link BatchVisibility} method parameters. + * + * @author Clement Denis + * @author Tomaz Fernandes + * @since 3.3 + */ +public class BatchVisibilityHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver { + + private final String visibilityHeaderName; + + public BatchVisibilityHandlerMethodArgumentResolver(String visibilityHeaderName) { + this.visibilityHeaderName = visibilityHeaderName; + } + + @Override + public boolean supportsParameter(MethodParameter parameter) { + return ClassUtils.isAssignable(BatchVisibility.class, parameter.getParameterType()); + } + + @SuppressWarnings("unchecked") + @Override + public Object resolveArgument(MethodParameter parameter, Message message) { + + Object payloadObject = message.getPayload(); + Assert.isInstanceOf(Collection.class, payloadObject, "Payload must be instance of Collection"); + Collection> messages = (Collection>) payloadObject; + + QueueMessageVisibility visibilityObject = MessageHeaderUtils.getHeader(messages + .iterator() + .next(), visibilityHeaderName, QueueMessageVisibility.class); + + return visibilityObject.toBatchVisibility(messages); + } + +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/resolver/NotificationMessageArgumentResolver.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/resolver/NotificationMessageArgumentResolver.java index af9bfb389..dd297f217 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/resolver/NotificationMessageArgumentResolver.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/resolver/NotificationMessageArgumentResolver.java @@ -23,11 +23,11 @@ import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; -import org.springframework.util.Assert; /** * @author Michael Sosa * @author gustavomonarin + * @author Wei Jiang * @since 3.1.1 */ public class NotificationMessageArgumentResolver implements HandlerMethodArgumentResolver { @@ -45,10 +45,7 @@ public boolean supportsParameter(MethodParameter parameter) { @Override public Object resolveArgument(MethodParameter par, Message msg) { - Object object = this.converter.fromMessage(msg, par.getParameterType(), par); - Assert.isInstanceOf(SnsMessageConverter.SnsMessageWrapper.class, object); - SnsMessageConverter.SnsMessageWrapper nr = (SnsMessageConverter.SnsMessageWrapper) object; - return nr.message(); + return this.converter.fromMessage(msg, par.getParameterType(), par); } } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsFifoIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsFifoIntegrationTests.java index f2c45600a..ea5c4e49f 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsFifoIntegrationTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsFifoIntegrationTests.java @@ -25,6 +25,7 @@ import io.awspring.cloud.sqs.annotation.SqsListener; import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration; import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; +import io.awspring.cloud.sqs.listener.FifoBatchGroupingStrategy; import io.awspring.cloud.sqs.listener.FifoSqsComponentFactory; import io.awspring.cloud.sqs.listener.ListenerMode; import io.awspring.cloud.sqs.listener.MessageListener; @@ -92,6 +93,8 @@ class SqsFifoIntegrationTests extends BaseSqsIntegrationTest { static final String FIFO_RECEIVES_BATCHES_MANY_GROUPS_QUEUE_NAME = "fifo_receives_batches_many_groups.fifo"; + static final String FIFO_RECEIVES_BATCH_GROUPING_STRATEGY_MULTIPLE_GROUPS_IN_SAME_BATCH_QUEUE_NAME = "fifo_receives_batch_grouping_strategy_multiple_groups_in_same_batch.fifo"; + static final String FIFO_MANUALLY_CREATE_CONTAINER_QUEUE_NAME = "fifo_manually_create_container_test_queue.fifo"; static final String FIFO_MANUALLY_CREATE_FACTORY_QUEUE_NAME = "fifo_manually_create_factory_test_queue.fifo"; @@ -141,6 +144,7 @@ static void beforeTests() { createFifoQueue(client, FIFO_STOPS_PROCESSING_ON_ERROR_QUEUE_NAME, getVisibilityAttribute("2")), createFifoQueue(client, FIFO_STOPS_PROCESSING_ON_ACK_ERROR_ERROR_QUEUE_NAME), createFifoQueue(client, FIFO_RECEIVES_BATCHES_MANY_GROUPS_QUEUE_NAME), + createFifoQueue(client, FIFO_RECEIVES_BATCH_GROUPING_STRATEGY_MULTIPLE_GROUPS_IN_SAME_BATCH_QUEUE_NAME), createFifoQueue(client, FIFO_MANUALLY_CREATE_CONTAINER_QUEUE_NAME), createFifoQueue(client, FIFO_MANUALLY_CREATE_FACTORY_QUEUE_NAME), createFifoQueue(client, FIFO_MANUALLY_CREATE_BATCH_CONTAINER_QUEUE_NAME), @@ -315,6 +319,55 @@ void receivesBatchesManyGroups() throws Exception { .containsExactlyElementsOf(values); } + @Test + void receivesFifoBatchGroupingStrategyMultipleGroupsInSameBatch() throws Exception { + List values = IntStream.range(0, 2).mapToObj(String::valueOf) + .collect(toList()); + String messageGroupId1 = UUID.randomUUID().toString(); + String messageGroupId2 = UUID.randomUUID().toString(); + List> messages = new ArrayList<>(); + messages.addAll(createMessagesFromValues(messageGroupId1, values)); + messages.addAll(createMessagesFromValues(messageGroupId2, values)); + sqsTemplate.sendMany(FIFO_RECEIVES_BATCH_GROUPING_STRATEGY_MULTIPLE_GROUPS_IN_SAME_BATCH_QUEUE_NAME, messages); + + SqsMessageListenerContainer container = SqsMessageListenerContainer + .builder() + .queueNames(FIFO_RECEIVES_BATCH_GROUPING_STRATEGY_MULTIPLE_GROUPS_IN_SAME_BATCH_QUEUE_NAME) + .messageListener(new MessageListener<>() { + @Override + public void onMessage(Message message) { + throw new UnsupportedOperationException("Batch listener"); + } + + @Override + public void onMessage(Collection> messages) { + assertThat(MessageHeaderUtils + .getHeader(messages, SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, String.class) + .stream().distinct().count()).isEqualTo(2); + latchContainer.receivesFifoBatchGroupingStrategyMultipleGroupsInSameBatchLatch.countDown(); + } + }) + .configure(options -> options + .maxConcurrentMessages(10) + .pollTimeout(Duration.ofSeconds(10)) + .maxMessagesPerPoll(10) + .maxDelayBetweenPolls(Duration.ofSeconds(1)) + .fifoBatchGroupingStrategy(FifoBatchGroupingStrategy.PROCESS_MULTIPLE_GROUPS_IN_SAME_BATCH) + .listenerMode(ListenerMode.BATCH)) + .sqsAsyncClient(BaseSqsIntegrationTest.createAsyncClient()) + .build(); + + try { + container.start(); + assertThat(latchContainer.receivesFifoBatchGroupingStrategyMultipleGroupsInSameBatchLatch + .await(settings.latchTimeoutSeconds, TimeUnit.SECONDS)).isTrue(); + } + finally { + container.stop(); + } + + } + @Test void manuallyCreatesContainer() throws Exception { List values = IntStream.range(0, this.settings.messagesPerTest).mapToObj(String::valueOf) @@ -511,6 +564,7 @@ static class LatchContainer { CountDownLatch stopsProcessingOnAckErrorLatch2; CountDownLatch stopsProcessingOnAckErrorHasThrown; CountDownLatch receivesBatchManyGroupsLatch; + CountDownLatch receivesFifoBatchGroupingStrategyMultipleGroupsInSameBatchLatch; LatchContainer(Settings settings) { this.settings = settings; @@ -528,6 +582,7 @@ static class LatchContainer { this.stopsProcessingOnAckErrorLatch1 = new CountDownLatch(1); this.stopsProcessingOnAckErrorLatch2 = new CountDownLatch(1); this.receivesBatchManyGroupsLatch = new CountDownLatch(1); + this.receivesFifoBatchGroupingStrategyMultipleGroupsInSameBatchLatch = new CountDownLatch(1); this.stopsProcessingOnAckErrorHasThrown = new CountDownLatch(1); } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java index d93b531b1..3ec34567b 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java @@ -26,6 +26,7 @@ import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration; import io.awspring.cloud.sqs.config.SqsListenerConfigurer; import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; +import io.awspring.cloud.sqs.listener.BatchVisibility; import io.awspring.cloud.sqs.listener.ContainerComponentFactory; import io.awspring.cloud.sqs.listener.MessageListenerContainer; import io.awspring.cloud.sqs.listener.QueueAttributes; @@ -60,6 +61,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeAll; @@ -132,6 +134,8 @@ class SqsIntegrationTests extends BaseSqsIntegrationTest { static final String MANUAL_ACK_FACTORY = "manualAcknowledgementFactory"; + static final String MANUAL_ACK_BATCH_FACTORY = "manualAcknowledgementBatchFactory"; + static final String ACK_AFTER_SECOND_ERROR_FACTORY = "ackAfterSecondErrorFactory"; @BeforeAll @@ -182,9 +186,9 @@ void receivesMessage() throws Exception { @Test void receivesMessageBatch() throws Exception { - String messageBody = "receivesMessageBatch-payload"; - sqsTemplate.send(RECEIVES_MESSAGE_BATCH_QUEUE_NAME, messageBody); - logger.debug("Sent message to queue {} with messageBody {}", RECEIVES_MESSAGE_BATCH_QUEUE_NAME, messageBody); + List> messages = create10Messages("receivesMessageBatch"); + sqsTemplate.sendMany(RECEIVES_MESSAGE_BATCH_QUEUE_NAME, messages); + logger.debug("Sent 10 messages to queue {}", RECEIVES_MESSAGE_BATCH_QUEUE_NAME); assertThat(latchContainer.receivesMessageBatchLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(latchContainer.acknowledgementCallbackBatchLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -359,11 +363,23 @@ static class ReceivesMessageBatchListener { @Autowired LatchContainer latchContainer; - @SqsListener(queueNames = RECEIVES_MESSAGE_BATCH_QUEUE_NAME, factory = MANUAL_ACK_FACTORY, id = "receivesMessageBatchListener") - CompletableFuture listen(List messages, BatchAcknowledgement acknowledgement) { + AtomicBoolean firstPass = new AtomicBoolean(true); + + @SqsListener(queueNames = RECEIVES_MESSAGE_BATCH_QUEUE_NAME, factory = MANUAL_ACK_BATCH_FACTORY, id = "receivesMessageBatchListener") + CompletableFuture listen(List messages, BatchAcknowledgement acknowledgement, + BatchVisibility visibility) throws Exception { logger.debug("Received messages in listener: " + messages); - latchContainer.receivesMessageBatchLatch.countDown(); - return acknowledgement.acknowledgeAsync(); + + if (firstPass.compareAndSet(true, false)) { + visibility.changeTo(1); + Thread.sleep(1000); + } + else { + acknowledgement.acknowledge(); + } + + messages.forEach(msg -> latchContainer.receivesMessageBatchLatch.countDown()); + return CompletableFuture.completedFuture(null); } } @@ -481,7 +497,7 @@ void listen(String message) throws BrokenBarrierException, InterruptedException static class LatchContainer { final CountDownLatch receivesMessageLatch = new CountDownLatch(1); - final CountDownLatch receivesMessageBatchLatch = new CountDownLatch(1); + final CountDownLatch receivesMessageBatchLatch = new CountDownLatch(20); final CountDownLatch receivesMessageAsyncLatch = new CountDownLatch(1); final CountDownLatch doesNotAckLatch = new CountDownLatch(2); final CountDownLatch doesNotAckAsyncLatch = new CountDownLatch(2); @@ -608,6 +624,30 @@ public void onSuccess(Collection> messages) { .build(); } + @Bean(name = MANUAL_ACK_BATCH_FACTORY) + public SqsMessageListenerContainerFactory manualAcknowledgementBatchFactory() { + return SqsMessageListenerContainerFactory + .builder() + .configure(options -> options + .acknowledgementMode(AcknowledgementMode.MANUAL) + .maxConcurrentMessages(10) + .pollTimeout(Duration.ofSeconds(10)) + .maxMessagesPerPoll(10) + .queueAttributeNames(Collections.singletonList(QueueAttributeName.QUEUE_ARN)) + .maxDelayBetweenPolls(Duration.ofSeconds(10))) + .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient) + .acknowledgementResultCallback(new AcknowledgementResultCallback() { + @Override + public void onSuccess(Collection> messages) { + if (RECEIVES_MESSAGE_BATCH_QUEUE_NAME.equals(MessageHeaderUtils.getHeaderAsString(messages.iterator().next(), + SqsHeaders.SQS_QUEUE_NAME_HEADER))) { + latchContainer.acknowledgementCallbackBatchLatch.countDown(); + } + } + }) + .build(); + } + @Bean public MessageListenerContainer manuallyCreatedContainer() throws Exception { SqsAsyncClient client = BaseSqsIntegrationTest.createAsyncClient(); diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageConversionIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageConversionIntegrationTests.java index 9cf4f6eb2..4ab0132d1 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageConversionIntegrationTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageConversionIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2022 the original author or authors. + * Copyright 2013-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -44,6 +46,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -56,6 +59,8 @@ * * @author Tomaz Fernandes * @author Mikhail Strokov + * @author Dongha Kim + * @author Wei Jiang */ @SpringBootTest class SqsMessageConversionIntegrationTests extends BaseSqsIntegrationTest { @@ -69,6 +74,7 @@ class SqsMessageConversionIntegrationTests extends BaseSqsIntegrationTest { static final String RESOLVES_POJO_FROM_HEADER_QUEUE_NAME = "resolves_pojo_from_mapping_test_queue"; static final String RESOLVES_MY_OTHER_POJO_FROM_HEADER_QUEUE_NAME = "resolves_my_other_pojo_from_mapping_test_queue"; static final String RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_QUEUE_NAME = "resolves_pojo_from_notification_message_queue"; + static final String RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME = "resolves_pojo_from_notification_message_list_test_queue"; @Autowired LatchContainer latchContainer; @@ -85,7 +91,8 @@ static void beforeTests() { createQueue(client, RESOLVES_POJO_MESSAGE_LIST_QUEUE_NAME), createQueue(client, RESOLVES_POJO_FROM_HEADER_QUEUE_NAME), createQueue(client, RESOLVES_MY_OTHER_POJO_FROM_HEADER_QUEUE_NAME), - createQueue(client, RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_QUEUE_NAME)).join(); + createQueue(client, RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_QUEUE_NAME), + createQueue(client, RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME)).join(); } @Test @@ -151,10 +158,36 @@ void resolvesMyPojoFromNotificationMessage() throws Exception { assertThat(latchContainer.resolvesPojoNotificationMessageLatch.await(10, TimeUnit.SECONDS)).isTrue(); } + @Test + void resolvesMyPojoFromNotificationMessageList() throws Exception { + byte[] notificationJsonContent = FileCopyUtils + .copyToByteArray(getClass().getClassLoader().getResourceAsStream("notificationMessage.json")); + String payload = new String(notificationJsonContent); + List> messages = IntStream.range(0, 10).mapToObj(index -> MessageBuilder.withPayload(payload).build()).toList(); + sqsTemplate.sendMany(RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME, messages); + logger.debug("Sent message to queue {} with messageBody {}", + RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME, payload); + assertThat(latchContainer.resolvesPojoNotificationMessageListLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + private Map getHeaderMapping(Class clazz) { return Collections.singletonMap(SqsHeaders.SQS_DEFAULT_TYPE_HEADER, clazz.getName()); } + @Test + void shouldSendAndReceiveJsonString() throws Exception { + String messageBody = """ + { + "firstField": "hello", + "secondField": "sqs!" + } + """; + sqsTemplate.send(to -> to.queue(RESOLVES_POJO_TYPES_QUEUE_NAME).payload(messageBody).header(MessageHeaders.CONTENT_TYPE, "application/json")); + logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_TYPES_QUEUE_NAME, messageBody); + assertThat(latchContainer.resolvesPojoLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + static class ResolvesPojoListener { @Autowired @@ -247,6 +280,14 @@ void listen(@SnsNotificationMessage MyEnvelope myPojo) { RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_QUEUE_NAME); latchContainer.resolvesPojoNotificationMessageLatch.countDown(); } + + @SqsListener(queueNames = RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME, id = "resolves-pojo-with-notification-message-list", factory = "defaultSqsListenerContainerFactory") + void listen(@SnsNotificationMessage List> myPojos) { + Assert.notEmpty(myPojos, "Received empty messages"); + logger.debug("Received messages {} from queue {}", myPojos, + RESOLVES_POJO_FROM_NOTIFICATION_MESSAGE_LIST_QUEUE_NAME); + latchContainer.resolvesPojoNotificationMessageListLatch.countDown(); + } } static class LatchContainer { @@ -258,6 +299,7 @@ static class LatchContainer { CountDownLatch resolvesPojoFromMappingLatch = new CountDownLatch(1); CountDownLatch resolvesMyOtherPojoFromMappingLatch = new CountDownLatch(1); CountDownLatch resolvesPojoNotificationMessageLatch = new CountDownLatch(1); + CountDownLatch resolvesPojoNotificationMessageListLatch = new CountDownLatch(1); } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsTemplateIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsTemplateIntegrationTests.java index ebaacec1c..f8ae81d9a 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsTemplateIntegrationTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsTemplateIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2023 the original author or authors. + * Copyright 2013-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,24 +15,10 @@ */ package io.awspring.cloud.sqs.integration; -import static org.assertj.core.api.Assertions.assertThat; - import io.awspring.cloud.sqs.listener.SqsHeaders; import io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement; -import io.awspring.cloud.sqs.operations.SendResult; -import io.awspring.cloud.sqs.operations.SqsOperations; -import io.awspring.cloud.sqs.operations.SqsTemplate; -import io.awspring.cloud.sqs.operations.SqsTemplateParameters; -import io.awspring.cloud.sqs.operations.TemplateAcknowledgementMode; +import io.awspring.cloud.sqs.operations.*; import io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.stream.IntStream; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -41,13 +27,22 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StopWatch; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; + /** * @author Tomaz Fernandes + * @author Dongha Kim */ @SpringBootTest public class SqsTemplateIntegrationTests extends BaseSqsIntegrationTest { @@ -78,6 +73,8 @@ public class SqsTemplateIntegrationTests extends BaseSqsIntegrationTest { private static final String HANDLES_CONTENT_DEDUPLICATION_QUEUE_NAME = "handles-content-deduplication-queue.fifo"; + private static final String SENDS_AND_RECEIVES_JSON_MESSAGE_QUEUE_NAME = "send-receive-json-message-queue"; + @Autowired private SqsAsyncClient asyncClient; @@ -92,7 +89,9 @@ static void beforeTests() { createQueue(client, RECORD_WITHOUT_TYPE_HEADER_QUEUE_NAME), createQueue(client, RETURNS_ON_PARTIAL_BATCH_QUEUE_NAME), createQueue(client, THROWS_ON_PARTIAL_BATCH_QUEUE_NAME), - createQueue(client, SENDS_AND_RECEIVES_MANUAL_ACK_QUEUE_NAME), createQueue(client, EMPTY_QUEUE_NAME), + createQueue(client, SENDS_AND_RECEIVES_JSON_MESSAGE_QUEUE_NAME), + createQueue(client, SENDS_AND_RECEIVES_MANUAL_ACK_QUEUE_NAME), + createQueue(client, EMPTY_QUEUE_NAME), createFifoQueue(client, SENDS_AND_RECEIVES_MESSAGE_FIFO_QUEUE_NAME), createFifoQueue(client, SENDS_AND_RECEIVES_BATCH_FIFO_QUEUE_NAME), createFifoQueue(client, HANDLES_CONTENT_DEDUPLICATION_QUEUE_NAME, @@ -187,6 +186,27 @@ void shouldSendAndReceiveWithManualAcknowledgement() { assertThat(receivedMessage3).isEmpty(); } + @Test + void shouldSendAndReceiveJsonString() { + SqsOperations template = SqsTemplate.builder() + .sqsAsyncClient(this.asyncClient) + .configureDefaultConverter(AbstractMessagingMessageConverter::doNotSendPayloadTypeHeader) + .buildSyncTemplate(); + String jsonString = """ + { + "propertyOne": "hello", + "propertyTwo": "sqs!" + } + """; + SampleRecord expectedPayload = new SampleRecord("hello", "sqs!"); + SendResult result = template.send(to -> to.queue(SENDS_AND_RECEIVES_JSON_MESSAGE_QUEUE_NAME) + .payload(jsonString).header(MessageHeaders.CONTENT_TYPE, "application/json")); + assertThat(result).isNotNull(); + Optional> receivedMessage = template + .receive(from -> from.queue(SENDS_AND_RECEIVES_JSON_MESSAGE_QUEUE_NAME), SampleRecord.class); + assertThat(receivedMessage).isPresent().get().extracting(Message::getPayload).isEqualTo(expectedPayload); + } + @Test void shouldSendAndReceiveBatch() { SqsOperations template = SqsTemplate.builder().sqsAsyncClient(this.asyncClient) diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-dynamodb/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-dynamodb/pom.xml index 35faf9194..aac9bd67b 100644 --- a/spring-cloud-aws-starters/spring-cloud-aws-starter-dynamodb/pom.xml +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-dynamodb/pom.xml @@ -5,7 +5,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 ../../pom.xml 4.0.0 diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-metrics/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-metrics/pom.xml index 6771350b4..0f7fe8a23 100644 --- a/spring-cloud-aws-starters/spring-cloud-aws-starter-metrics/pom.xml +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-metrics/pom.xml @@ -22,7 +22,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 ../../pom.xml spring-cloud-aws-starter-metrics diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-parameter-store/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-parameter-store/pom.xml index db476a8fd..b5f37d084 100644 --- a/spring-cloud-aws-starters/spring-cloud-aws-starter-parameter-store/pom.xml +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-parameter-store/pom.xml @@ -5,7 +5,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 ../../pom.xml 4.0.0 diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-s3/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-s3/pom.xml index a9ff730ab..3ca9dc3ec 100644 --- a/spring-cloud-aws-starters/spring-cloud-aws-starter-s3/pom.xml +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-s3/pom.xml @@ -5,7 +5,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 ../../pom.xml 4.0.0 diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-secrets-manager/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-secrets-manager/pom.xml index 7fe07fd79..49d879c1a 100644 --- a/spring-cloud-aws-starters/spring-cloud-aws-starter-secrets-manager/pom.xml +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-secrets-manager/pom.xml @@ -5,7 +5,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 ../../pom.xml 4.0.0 diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-ses/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-ses/pom.xml index 810bfbc16..e5a7273dd 100644 --- a/spring-cloud-aws-starters/spring-cloud-aws-starter-ses/pom.xml +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-ses/pom.xml @@ -5,7 +5,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 ../../pom.xml 4.0.0 diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-sns/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-sns/pom.xml index 2ee0c98ce..4eb6cc634 100644 --- a/spring-cloud-aws-starters/spring-cloud-aws-starter-sns/pom.xml +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-sns/pom.xml @@ -6,7 +6,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 ../../pom.xml diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-sqs/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-sqs/pom.xml index c7fa5de32..9c277604a 100644 --- a/spring-cloud-aws-starters/spring-cloud-aws-starter-sqs/pom.xml +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-sqs/pom.xml @@ -6,7 +6,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 ../../pom.xml diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter/pom.xml index b8e7e340c..e05325aa5 100644 --- a/spring-cloud-aws-starters/spring-cloud-aws-starter/pom.xml +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter/pom.xml @@ -5,7 +5,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 ../../pom.xml 4.0.0 diff --git a/spring-cloud-aws-test/pom.xml b/spring-cloud-aws-test/pom.xml index aab701fab..235583993 100644 --- a/spring-cloud-aws-test/pom.xml +++ b/spring-cloud-aws-test/pom.xml @@ -22,7 +22,7 @@ io.awspring.cloud spring-cloud-aws - 3.2.0-SNAPSHOT + 3.2.0 spring-cloud-aws-test Spring Cloud AWS Test diff --git a/spring-cloud-aws-testcontainers/pom.xml b/spring-cloud-aws-testcontainers/pom.xml index 8a28f2b3b..d9f7dc453 100644 --- a/spring-cloud-aws-testcontainers/pom.xml +++ b/spring-cloud-aws-testcontainers/pom.xml @@ -6,7 +6,7 @@ spring-cloud-aws io.awspring.cloud - 3.2.0-SNAPSHOT + 3.2.0 4.0.0