Skip to content

Commit

Permalink
Formatting. (#1213)
Browse files Browse the repository at this point in the history
  • Loading branch information
maciejwalkowiak committed Sep 17, 2024
1 parent 4881ba9 commit 9ea1208
Show file tree
Hide file tree
Showing 24 changed files with 177 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration;
import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import io.awspring.cloud.sqs.operations.SqsTemplateBuilder;
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
Expand Down Expand Up @@ -82,8 +82,10 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder

@ConditionalOnMissingBean
@Bean
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<ObjectMapper> objectMapperProvider, MessagingMessageConverter<Message> messageConverter) {
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient).messageConverter(messageConverter);
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<ObjectMapper> objectMapperProvider,
MessagingMessageConverter<Message> messageConverter) {
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient)
.messageConverter(messageConverter);
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messageConverter, om));
if (sqsProperties.getQueueNotFoundStrategy() != null) {
builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy()));
Expand All @@ -97,8 +99,7 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
ObjectProvider<SqsAsyncClient> sqsAsyncClient, ObjectProvider<AsyncErrorHandler<Object>> asyncErrorHandler,
ObjectProvider<ErrorHandler<Object>> errorHandler,
ObjectProvider<AsyncMessageInterceptor<Object>> asyncInterceptors,
ObjectProvider<MessageInterceptor<Object>> interceptors,
ObjectProvider<ObjectMapper> objectMapperProvider,
ObjectProvider<MessageInterceptor<Object>> interceptors, ObjectProvider<ObjectMapper> objectMapperProvider,
MessagingMessageConverter<?> messagingMessageConverter) {

SqsMessageListenerContainerFactory<Object> factory = new SqsMessageListenerContainerFactory<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.awspring.cloud.autoconfigure.AwsClientProperties;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;

import java.time.Duration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.lang.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
import io.awspring.cloud.sqs.QueueAttributesResolvingException;
import io.awspring.cloud.sqs.annotation.SqsListener;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import org.testcontainers.shaded.org.bouncycastle.util.Arrays;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;

import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -41,7 +38,9 @@
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.org.bouncycastle.util.Arrays;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;

/**
* Integration tests for {@link SqsAutoConfiguration}.
Expand All @@ -59,28 +58,26 @@ class SqsAutoConfigurationIntegrationTest {

@Container
static LocalStackContainer localstack = new LocalStackContainer(
DockerImageName.parse("localstack/localstack:3.2.0"));
DockerImageName.parse("localstack/localstack:3.2.0"));

static {
localstack.start();
}

private static final String[] BASE_PARAMS = {"spring.cloud.aws.sqs.region=eu-west-1",
"spring.cloud.aws.sqs.endpoint=" + localstack.getEndpoint(),
"spring.cloud.aws.credentials.access-key=noop", "spring.cloud.aws.credentials.secret-key=noop",
"spring.cloud.aws.region.static=eu-west-1"};
private static final String[] BASE_PARAMS = { "spring.cloud.aws.sqs.region=eu-west-1",
"spring.cloud.aws.sqs.endpoint=" + localstack.getEndpoint(), "spring.cloud.aws.credentials.access-key=noop",
"spring.cloud.aws.credentials.secret-key=noop", "spring.cloud.aws.region.static=eu-west-1" };

private static final AutoConfigurations BASE_CONFIGURATIONS = AutoConfigurations.of(RegionProviderAutoConfiguration.class,
CredentialsProviderAutoConfiguration.class, SqsAutoConfiguration.class, AwsAutoConfiguration.class,
ListenerConfiguration.class);
private static final AutoConfigurations BASE_CONFIGURATIONS = AutoConfigurations.of(
RegionProviderAutoConfiguration.class, CredentialsProviderAutoConfiguration.class,
SqsAutoConfiguration.class, AwsAutoConfiguration.class, ListenerConfiguration.class);

private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withPropertyValues(BASE_PARAMS)
.withConfiguration(BASE_CONFIGURATIONS);
.withPropertyValues(BASE_PARAMS).withConfiguration(BASE_CONFIGURATIONS);

private final ApplicationContextRunner applicationContextRunnerWithFailStrategy = new ApplicationContextRunner()
.withPropertyValues(Arrays.append(BASE_PARAMS, "spring.cloud.aws.sqs.queue-not-found-strategy=fail"))
.withConfiguration(BASE_CONFIGURATIONS);
.withPropertyValues(Arrays.append(BASE_PARAMS, "spring.cloud.aws.sqs.queue-not-found-strategy=fail"))
.withConfiguration(BASE_CONFIGURATIONS);

@SuppressWarnings("unchecked")
@Test
Expand All @@ -95,21 +92,22 @@ void sendsAndReceivesMessage() {

@Test
void containerReceivesMessageWithFailQueueNotFoundStrategy() {
applicationContextRunnerWithFailStrategy.run(context ->
assertThatThrownBy(() -> context.getBean(SqsTemplate.class).send(to -> to.queue("QUEUE_DOES_NOT_EXISTS").payload(PAYLOAD)))
.isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class).cause()
.isInstanceOf(CompletionException.class).cause().isInstanceOf(QueueAttributesResolvingException.class)
.cause().isInstanceOf(QueueDoesNotExistException.class));
applicationContextRunnerWithFailStrategy.run(context -> assertThatThrownBy(
() -> context.getBean(SqsTemplate.class).send(to -> to.queue("QUEUE_DOES_NOT_EXISTS").payload(PAYLOAD)))
.isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class)
.cause().isInstanceOf(CompletionException.class).cause()
.isInstanceOf(QueueAttributesResolvingException.class).cause()
.isInstanceOf(QueueDoesNotExistException.class));
}

@Test
void templateReceivesMessageWithFailQueueNotFoundStrategy() {
applicationContextRunnerWithFailStrategy
.run(context ->
assertThatThrownBy(() -> context.getBean(SqsTemplate.class).receive("QUEUE_DOES_NOT_EXIST", String.class))
.isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class).cause()
.isInstanceOf(CompletionException.class).cause().isInstanceOf(QueueAttributesResolvingException.class)
.cause().isInstanceOf(QueueDoesNotExistException.class));
applicationContextRunnerWithFailStrategy.run(context -> assertThatThrownBy(
() -> context.getBean(SqsTemplate.class).receive("QUEUE_DOES_NOT_EXIST", String.class))
.isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class)
.cause().isInstanceOf(CompletionException.class).cause()
.isInstanceOf(QueueAttributesResolvingException.class).cause()
.isInstanceOf(QueueDoesNotExistException.class));
}

static class Listener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@
import io.awspring.cloud.sqs.config.EndpointRegistrar;
import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration;
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.AbstractContainerOptions;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
import io.awspring.cloud.sqs.listener.MessageListenerContainerRegistry;
import io.awspring.cloud.sqs.listener.SqsContainerOptions;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.operations.SqsTemplate;
Expand Down Expand Up @@ -229,23 +226,19 @@ void configuresObjectMapper() {
void configuresMessageConverter() {
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
.withUserConfiguration(ObjectMapperConfiguration.class, MessageConverterConfiguration.class)
.run(context -> {
SqsTemplate sqsTemplate = context.getBean("sqsTemplate", SqsTemplate.class);
SqsMessageListenerContainerFactory<?> factory = context.getBean("defaultSqsListenerContainerFactory", SqsMessageListenerContainerFactory.class);
ObjectMapper objectMapper = context.getBean(CUSTOM_OBJECT_MAPPER_BEAN_NAME, ObjectMapper.class);
SqsMessagingMessageConverter converter = context.getBean(CUSTOM_MESSAGE_CONVERTER_BEAN_NAME, SqsMessagingMessageConverter.class);
assertThat(converter.getPayloadMessageConverter())
.extracting("converters")
.asList()
.filteredOn(conv -> conv instanceof MappingJackson2MessageConverter)
.first()
.extracting("objectMapper")
.isEqualTo(objectMapper);
assertThat(sqsTemplate).extracting("messageConverter").isEqualTo(converter);
assertThat(factory)
.extracting("containerOptionsBuilder")
.extracting("messageConverter")
.isEqualTo(converter);
.run(context -> {
SqsTemplate sqsTemplate = context.getBean("sqsTemplate", SqsTemplate.class);
SqsMessageListenerContainerFactory<?> factory = context
.getBean("defaultSqsListenerContainerFactory", SqsMessageListenerContainerFactory.class);
ObjectMapper objectMapper = context.getBean(CUSTOM_OBJECT_MAPPER_BEAN_NAME, ObjectMapper.class);
SqsMessagingMessageConverter converter = context.getBean(CUSTOM_MESSAGE_CONVERTER_BEAN_NAME,
SqsMessagingMessageConverter.class);
assertThat(converter.getPayloadMessageConverter()).extracting("converters").asList()
.filteredOn(conv -> conv instanceof MappingJackson2MessageConverter).first()
.extracting("objectMapper").isEqualTo(objectMapper);
assertThat(sqsTemplate).extracting("messageConverter").isEqualTo(converter);
assertThat(factory).extracting("containerOptionsBuilder").extracting("messageConverter")
.isEqualTo(converter);
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
/*
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.docker.compose;

import static org.assertj.core.api.Assertions.assertThat;

import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails;
import java.io.IOException;
import java.net.URI;
import java.util.LinkedHashMap;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

import java.io.IOException;
import java.net.URI;
import java.util.LinkedHashMap;

import static org.assertj.core.api.Assertions.assertThat;

class AwsDockerComposeConnectionDetailsFactoryTest {

private final Resource dockerComposeResource = new ClassPathResource("docker-compose.yaml");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface DynamoDbOperations {
* @param clazz Class of entity being deleted so {@link software.amazon.awssdk.enhanced.dynamodb.TableSchema} can be
* generated.
*/
<T> T delete(Key key, Class<T> clazz);
<T> T delete(Key key, Class<T> clazz);

/**
* Deletes a record for a given Entity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public <T> T update(T entity) {
public <T> T delete(Key key, Class<T> clazz) {
Assert.notNull(key, "key is required");
Assert.notNull(clazz, "clazz is required");
return prepareTable(clazz).deleteItem(key);
return prepareTable(clazz).deleteItem(key);
}

public <T> T delete(T entity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ void returnsResourceUrl(S3OutputStreamProvider s3OutputStreamProvider) throws IO
@TestAvailableOutputStreamProviders
void returnsEmptyUrlToBucketWhenObjectIsEmpty(S3OutputStreamProvider s3OutputStreamProvider) throws IOException {
S3Resource resource = s3Resource("s3://first-bucket/", s3OutputStreamProvider);
assertThat(resource.getURL().toString())
.isEqualTo("https://first-bucket.s3.amazonaws.com/");
assertThat(resource.getURL().toString()).isEqualTo("https://first-bucket.s3.amazonaws.com/");
}

@TestAvailableOutputStreamProviders
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@ void testSendSimpleMailWithNoTo() {

ArgumentCaptor<SendEmailRequest> request = ArgumentCaptor.forClass(SendEmailRequest.class);
when(emailService.sendEmail(request.capture()))
.thenReturn(SendEmailResponse.builder().messageId("123").build());
.thenReturn(SendEmailResponse.builder().messageId("123").build());

mailSender.send(simpleMailMessage);

SendEmailRequest sendEmailRequest = request.getValue();
assertThat(sendEmailRequest.message().subject().data()).isEqualTo(simpleMailMessage.getSubject());
assertThat(sendEmailRequest.message().body().text().data()).isEqualTo(simpleMailMessage.getText());
assertThat(sendEmailRequest.destination().bccAddresses().get(0))
.isEqualTo(Objects.requireNonNull(simpleMailMessage.getBcc())[0]);
.isEqualTo(Objects.requireNonNull(simpleMailMessage.getBcc())[0]);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ protected String getMessageListenerContainerRegistryBeanName() {
@Override
protected Collection<HandlerMethodArgumentResolver> createAdditionalArgumentResolvers() {
return Arrays.asList(new VisibilityHandlerMethodArgumentResolver(SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER),
new BatchVisibilityHandlerMethodArgumentResolver(SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER), new SqsMessageMethodArgumentResolver(),
new QueueAttributesMethodArgumentResolver());
new BatchVisibilityHandlerMethodArgumentResolver(SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER),
new SqsMessageMethodArgumentResolver(), new QueueAttributesMethodArgumentResolver());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public interface BatchVisibility {

/**
* Asynchronously changes the provided messages visibility to the provided value.
* @param seconds number of seconds to set the visibility of the provided messages to.
* seconds to set the visibility of provided messages to.
* @param seconds number of seconds to set the visibility of the provided messages to. seconds to set the visibility
* of provided messages to.
* @return a completable future.
*/
CompletableFuture<Void> changeToAsync(int seconds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
public enum FifoBatchGroupingStrategy {

/**
* Default strategy. Group messages in batches by message group. Each batch contains messages from a single message group.
* The order of messages within the group is preserved. As message groups are processed in parallel, this strategy
* provides the maximal throughput.
* Default strategy. Group messages in batches by message group. Each batch contains messages from a single message
* group. The order of messages within the group is preserved. As message groups are processed in parallel, this
* strategy provides the maximal throughput.
*/
PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ public MessageSink<T> createMessageSink(SqsContainerOptions options) {
return maybeWrapWithMessageGroupingAdapter(options, wrappedDeliverySink);
}

private MessageSink<T> maybeWrapWithMessageGroupingAdapter(SqsContainerOptions options, MessageSink<T> wrappedDeliverySink) {
private MessageSink<T> maybeWrapWithMessageGroupingAdapter(SqsContainerOptions options,
MessageSink<T> wrappedDeliverySink) {
return FifoBatchGroupingStrategy.PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES
.equals(options.getFifoBatchGroupingStrategy())
? new MessageGroupingSinkAdapter<>(wrappedDeliverySink, getMessageGroupingFunction())
: wrappedDeliverySink;
.equals(options.getFifoBatchGroupingStrategy())
? new MessageGroupingSinkAdapter<>(wrappedDeliverySink, getMessageGroupingFunction())
: wrappedDeliverySink;
}

// @formatter:off
Expand Down
Loading

0 comments on commit 9ea1208

Please sign in to comment.