Skip to content

Commit

Permalink
Merge branch 'main' into S3-kms-support
Browse files Browse the repository at this point in the history
  • Loading branch information
MatejNedic committed Sep 17, 2024
2 parents 78ef972 + 5bee9c7 commit 932ffa0
Show file tree
Hide file tree
Showing 83 changed files with 1,346 additions and 149 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/upload-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ jobs:
with:
candidate: java
version: '17.0.1-tem'
- name: Set up JDK 8
uses: actions/setup-java@v4
- name: Set up JDK
uses: actions/setup-java@v1
with:
java-version: 8
distribution: 'adopt'
cache: 'maven'
jdkFile: ${{ steps.sdkman.outputs.file }}
- uses: actions/cache@v1
with:
path: ~/.m2/repository
Expand Down
2 changes: 1 addition & 1 deletion docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.2.0-SNAPSHOT</version>
<version>3.2.0</version>
</parent>
<artifactId>spring-cloud-aws-docs</artifactId>
<packaging>pom</packaging>
Expand Down
7 changes: 5 additions & 2 deletions docs/src/main/asciidoc/_configprops.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
|spring.cloud.aws.cloudwatch.region | | Overrides the default region.
|spring.cloud.aws.credentials.access-key | | The access key to be used with a static provider.
|spring.cloud.aws.credentials.instance-profile | `+++false+++` | Configures an instance profile credentials provider with no further configuration.
|spring.cloud.aws.credentials.profile | | The AWS profile.
|spring.cloud.aws.credentials.profile.name | | Profile name.
|spring.cloud.aws.credentials.profile.path | | Profile file path.
|spring.cloud.aws.credentials.secret-key | | The secret key to be used with a static provider.
|spring.cloud.aws.credentials.sts.async-credentials-update | `+++false+++` | Enables provider to asynchronously fetch credentials in the background. Defaults to synchronous blocking if not specified otherwise.
|spring.cloud.aws.credentials.sts.role-arn | | ARN of IAM role associated with STS. If not provided this will be read from {@link software.amazon.awssdk.core.SdkSystemSetting}.
Expand Down Expand Up @@ -39,7 +40,8 @@
|spring.cloud.aws.parameterstore.reload.period | `+++1m+++` | Refresh period for {@link PollingAwsPropertySourceChangeDetector}.
|spring.cloud.aws.parameterstore.reload.strategy | | Reload strategy to run when properties change.
|spring.cloud.aws.region.instance-profile | `+++false+++` | Configures an instance profile region provider with no further configuration.
|spring.cloud.aws.region.profile | | The AWS profile.
|spring.cloud.aws.region.profile.name | | Profile name.
|spring.cloud.aws.region.profile.path | | Profile file path.
|spring.cloud.aws.region.static | |
|spring.cloud.aws.s3.accelerate-mode-enabled | | Option to enable using the accelerate endpoint when accessing S3. Accelerate endpoints allow faster transfer of objects by using Amazon CloudFront's globally distributed edge locations.
|spring.cloud.aws.s3.checksum-validation-enabled | | Option to disable doing a validation of the checksum of an object stored in S3.
Expand Down Expand Up @@ -76,6 +78,7 @@
|spring.cloud.aws.sqs.listener.max-concurrent-messages | | The maximum concurrent messages that can be processed simultaneously for each queue. Note that if acknowledgement batching is being used, the actual maximum number of messages inflight might be higher.
|spring.cloud.aws.sqs.listener.max-messages-per-poll | | The maximum number of messages to be retrieved in a single poll to SQS.
|spring.cloud.aws.sqs.listener.poll-timeout | | The maximum amount of time for a poll to SQS.
|spring.cloud.aws.sqs.queue-not-found-strategy | |
|spring.cloud.aws.sqs.region | | Overrides the default region.

|===
34 changes: 34 additions & 0 deletions docs/src/main/asciidoc/docker-compose.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[#spring-cloud-aws-docker-compose]
== Docker Compose

Spring Cloud AWS provides Docker Compose support for https://docs.localstack.cloud/references/docker-images/[LocalStack docker images] which simplifies local development of Spring Cloud AWS based projects.

Maven coordinates, using <<index.adoc#bill-of-materials, Spring Cloud AWS BOM>>:

[source,xml]
----
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-docker-compose</artifactId>
</dependency>
----

For more information about Spring Docker Compose support please refer to https://docs.spring.io/spring-boot/reference/features/docker-compose.html[official Spring documentation]

=== Example docker-compose.yaml file

[source,yaml]
----
services:
localstack:
image: localstack/localstack
environment:
AWS_ACCESS_KEY_ID: noop
AWS_SECRET_ACCESS_KEY: noop
AWS_DEFAULT_REGION: eu-central-1
ports:
- "4566:4566"
----

`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` and `AWS_DEFAULT_REGION` are required environment variables to ensure proper integration.

2 changes: 2 additions & 0 deletions docs/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ include::spring-modulith.adoc[]

include::testing.adoc[]

include::docker-compose.adoc[]

include::migration.adoc[]

== Configuration properties
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/asciidoc/s3.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ s3Template.upload(BUCKET, "file.txt", is, ObjectMetadata.builder().contentType("
Another feature of `S3Template` is the ability to generate signed URLs for getting/putting S3 objects in a single method call.
[source,java]
----
URL signedGetUrl = s3Template.createSignedGetUrl("bucket_name", "file.txt", Duration.ofMinutes(5));
URL signedGetUrl = s3Template.createSignedGetURL("bucket_name", "file.txt", Duration.ofMinutes(5));
----

`S3Template` also allows storing & retrieving Java objects.
Expand Down
32 changes: 27 additions & 5 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,9 @@ NOTE: Queues declared in the same annotation will share the container, though ea
===== SNS Messages

Since 3.1.1, when receiving SNS messages through the `@SqsListener`, the message includes all attributes of the `SnsNotification`. To only receive need the `Message` part of the payload, you can utilize the `@SnsNotificationMessage` annotation.

For handling individual message processing, the @SnsNotificationMessage annotation can be used in the following manner:

[source, java]
----
@SqsListener("my-queue")
Expand All @@ -626,6 +629,16 @@ public void listen(@SnsNotificationMessage Pojo pojo) {
}
----

For batch message processing, use the @SnsNotificationMessage annotation with a List<Pojo> parameter.

[source, java]
----
@SqsListener("my-queue")
public void listen(@SnsNotificationMessage List<Pojo> pojos) {
System.out.println(pojos.size());
}
----

===== Specifying a MessageListenerContainerFactory
A `MessageListenerContainerFactory` can be specified through the `factory` property.
Such factory will then be used to create the container for the annotated method.
Expand Down Expand Up @@ -677,6 +690,7 @@ AcknowledgementMode must be set to `MANUAL` (see <<Acknowledging Messages>>)
- `BatchAcknowledgement` - provides methods for manually acknowledging partial or whole message batches for batch listeners.
AcknowledgementMode must be set to `MANUAL` (see <<Acknowledging Messages>>)
- `Visibility` - provides the `changeTo()` method that enables changing the message's visibility to the provided value.
- `BatchVisibility` - provides `changeTo()` methods that enables changing partial or whole message batches visibility to the provided value.
- `QueueAttributes` - provides queue attributes for the queue that received the message.
See <<Retrieving Attributes from SQS>> for how to specify the queue attributes that will be fetched from `SQS`
- `software.amazon.awssdk.services.sqs.model.Message` - provides the original `Message` from `SQS`
Expand All @@ -699,7 +713,7 @@ public void listen(Message<MyPojo> message, MyPojo pojo, MessageHeaders headers,
}
----

IMPORTANT: Batch listeners support a single `List<MyPojo>` and `List<Message<MyPojo>>` method arguments, and an optional `BatchAcknowledgement` or `AsyncBatchAcknowledgement` arguments.
IMPORTANT: Batch listeners support a single `List<MyPojo>` and `List<Message<MyPojo>>` method arguments, and optional `BatchAcknowledgement` (or `AsyncBatchAcknowledgement`) and `BatchVisibility` arguments.
`MessageHeaders` should be extracted from the `Message` instances through the `getHeaders()` method.

==== Batch Processing
Expand All @@ -711,7 +725,9 @@ When batch mode is enabled, the framework will serve the entire result of a poll
If a value greater than 10 is provided for `maxMessagesPerPoll`, the result of multiple polls will be combined and up to the respective amount of messages will be served to the listener.

To enable batch processing using `@SqsListener`, a single `List<MyPojo>` or `List<Message<MyPojo>>` method argument should be provided in the listener method.
The listener method can also have an optional `BatchAcknowledgement` argument for `AcknowledgementMode.MANUAL`.
The listener method can also have:
- an optional `BatchAcknowledgement` argument for `AcknowledgementMode.MANUAL`
- an optional `BatchVisibility` argument

Alternatively, `SqsContainerOptions` can be set to `ListenerMode.BATCH` in the `SqsContainerOptions` in the factory or container.

Expand Down Expand Up @@ -786,6 +802,7 @@ The Spring Boot Starter for SQS provides the following auto-configuration proper
| <<maxConcurrentMessages, `spring.cloud.aws.sqs.listener.max-inflight-messages-per-queue`>> | Maximum number of inflight messages per queue. | No | 10
| <<maxMessagesPerPoll, `spring.cloud.aws.sqs.listener.max-messages-per-poll`>> | Maximum number of messages to be received per poll. | No | 10
| <<pollTimeout, `spring.cloud.aws.sqs.listener.poll-timeout`>> | Maximum amount of time to wait for messages in a poll. | No | 10 seconds
| `spring.cloud.aws.sqs.queue-not-found-strategy` | The strategy to be used by SqsTemplate and SqsListeners when a queue does not exist. | No | CREATE
|===


Expand Down Expand Up @@ -884,6 +901,12 @@ See <<Retrieving Attributes from SQS>>.
|Configures the `MessageSystemAttribute` that will be retrieved from SQS for each message.
See <<Retrieving Attributes from SQS>>.

|`fifoBatchGroupingStrategy`
|`PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES`, `PROCESS_MULTIPLE_GROUPS_IN_SAME_BATCH`
|`PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES`
|Specifies how messages from FIFO queues should be grouped when retrieved by the container when listener
mode is `batch`. See <<FIFO Support>>.

|`messageConverter`
|`MessagingMessageConverter`
|`SqsMessagingMessageConverter`
Expand Down Expand Up @@ -1029,10 +1052,9 @@ NOTE: Spring-managed `MessageListenerContainer` beans' lifecycle actions are alw

* Messages are polled with a `receiveRequestAttemptId`, and the received batch of messages is split according to the message`s `MessageGroupId`.
* Each message from a given group will then be processed in order, while each group is processed in parallel.
* If processing fails for a message, the following messages from the same message group are discarded so they will be served again after their `message visibility`
expires.
* To receive messages from multiple groups in a `batch`, set `fifoBatchGroupingStrategy` to `PROCESS_MULTIPLE_GROUPS_IN_SAME_BATCH` in `SqsContainerOptions`.
* If processing fails for a message, the following messages from the same message group are discarded so they will be served again after their `message visibility` expires.
* Messages which were already successfully processed and acknowledged will not be served again.
* If a `batch` listener is used, each message group from a poll will be served as a batch to the listener method.
* `FIFO` queues also have different defaults for acknowledging messages, see <<Acknowledgement Defaults>> for more information.
* If a `message visibility` is set through `@SqsListener` or `SqsContainerOptions`, visibility will be extended for all messages in the message group before each message is processed.

Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>4.1.0</version>
<version>4.1.3</version>
<relativePath/><!-- lookup parent from repository -->
</parent>

<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.2.0-SNAPSHOT</version>
<version>3.2.0</version>
<packaging>pom</packaging>
<name>Spring Cloud AWS</name>

Expand All @@ -37,6 +37,7 @@
<module>spring-cloud-aws-core</module>
<module>spring-cloud-aws-autoconfigure</module>
<module>spring-cloud-aws-dependencies</module>
<module>spring-cloud-aws-docker-compose</module>
<module>spring-cloud-aws-parameter-store</module>
<module>spring-cloud-aws-secrets-manager</module>
<module>spring-cloud-aws-ses</module>
Expand Down
2 changes: 1 addition & 1 deletion spring-cloud-aws-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.2.0-SNAPSHOT</version>
<version>3.2.0</version>
</parent>

<artifactId>spring-cloud-aws-autoconfigure</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2022 the original author or authors.
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,13 +24,14 @@
import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration;
import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import io.awspring.cloud.sqs.operations.SqsTemplateBuilder;
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand All @@ -45,12 +46,15 @@
import org.springframework.context.annotation.Import;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;
import software.amazon.awssdk.services.sqs.model.Message;

/**
* {@link EnableAutoConfiguration Auto-configuration} for SQS integration.
*
* @author Tomaz Fernandes
* @author Maciej Walkowiak
* @author Wei Jiang
* @author Dongha Kim
* @since 3.0
*/
@AutoConfiguration
Expand Down Expand Up @@ -78,10 +82,12 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder

@ConditionalOnMissingBean
@Bean
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<ObjectMapper> objectMapperProvider) {
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient);
objectMapperProvider
.ifAvailable(om -> builder.configureDefaultConverter(converter -> converter.setObjectMapper(om)));
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<ObjectMapper> objectMapperProvider, MessagingMessageConverter<Message> messageConverter) {
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient).messageConverter(messageConverter);
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messageConverter, om));
if (sqsProperties.getQueueNotFoundStrategy() != null) {
builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy()));
}
return builder.build();
}

Expand All @@ -92,28 +98,36 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
ObjectProvider<ErrorHandler<Object>> errorHandler,
ObjectProvider<AsyncMessageInterceptor<Object>> asyncInterceptors,
ObjectProvider<MessageInterceptor<Object>> interceptors,
ObjectProvider<ObjectMapper> objectMapperProvider) {
ObjectProvider<ObjectMapper> objectMapperProvider,
MessagingMessageConverter<?> messagingMessageConverter) {

SqsMessageListenerContainerFactory<Object> factory = new SqsMessageListenerContainerFactory<>();
factory.configure(this::configureContainerOptions);
factory.configure(this::configureProperties);
sqsAsyncClient.ifAvailable(factory::setSqsAsyncClient);
asyncErrorHandler.ifAvailable(factory::setErrorHandler);
errorHandler.ifAvailable(factory::setErrorHandler);
interceptors.forEach(factory::addMessageInterceptor);
asyncInterceptors.forEach(factory::addMessageInterceptor);
objectMapperProvider.ifAvailable(objectMapper -> setObjectMapper(factory, objectMapper));
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messagingMessageConverter, om));
factory.configure(options -> options.messageConverter(messagingMessageConverter));
return factory;
}

private void setObjectMapper(SqsMessageListenerContainerFactory<Object> factory, ObjectMapper objectMapper) {
// Object Mapper for early deserialization in MessageSource
var messageConverter = new SqsMessagingMessageConverter();
messageConverter.setObjectMapper(objectMapper);
factory.configure(options -> options.messageConverter(messageConverter));
private void setMapperToConverter(MessagingMessageConverter<?> messagingMessageConverter, ObjectMapper om) {
if (messagingMessageConverter instanceof SqsMessagingMessageConverter sqsConverter) {
sqsConverter.setObjectMapper(om);
}
}

private void configureContainerOptions(ContainerOptionsBuilder<?, ?> options) {
@ConditionalOnMissingBean
@Bean
public MessagingMessageConverter<Message> messageConverter() {
return new SqsMessagingMessageConverter();
}

private void configureProperties(SqsContainerOptionsBuilder options) {
PropertyMapper mapper = PropertyMapper.get().alwaysApplyingWhenNonNull();
mapper.from(this.sqsProperties.getQueueNotFoundStrategy()).to(options::queueNotFoundStrategy);
mapper.from(this.sqsProperties.getListener().getMaxConcurrentMessages()).to(options::maxConcurrentMessages);
mapper.from(this.sqsProperties.getListener().getMaxMessagesPerPoll()).to(options::maxMessagesPerPoll);
mapper.from(this.sqsProperties.getListener().getPollTimeout()).to(options::pollTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.awspring.cloud.autoconfigure.sqs;

import io.awspring.cloud.autoconfigure.AwsClientProperties;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;

import java.time.Duration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.lang.Nullable;
Expand All @@ -24,6 +26,7 @@
* Properties related to AWS SQS.
*
* @author Tomaz Fernandes
* @author Wei Jiang
* @since 3.0
*/
@ConfigurationProperties(prefix = SqsProperties.PREFIX)
Expand All @@ -44,6 +47,26 @@ public void setListener(Listener listener) {
this.listener = listener;
}

@Nullable
private QueueNotFoundStrategy queueNotFoundStrategy;

/**
* Return the strategy to use if the queue is not found.
* @return the {@link QueueNotFoundStrategy}
*/
@Nullable
public QueueNotFoundStrategy getQueueNotFoundStrategy() {
return queueNotFoundStrategy;
}

/**
* Set the strategy to use if the queue is not found.
* @param queueNotFoundStrategy the strategy to set.
*/
public void setQueueNotFoundStrategy(QueueNotFoundStrategy queueNotFoundStrategy) {
this.queueNotFoundStrategy = queueNotFoundStrategy;
}

public static class Listener {

/**
Expand Down
Loading

0 comments on commit 932ffa0

Please sign in to comment.