From 5946c95a17eac0c9d4fdca507a66dda0f9e87d97 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Thu, 29 Aug 2024 23:39:50 -0500 Subject: [PATCH] Move default subscription type to factory (#818) This commit moves the default subscription type from the `@PulsarListener` and `@ReactivePulsarListener` annotation to the associated container factory (props) which allows the Spring Boot `spring.pulsar.consumer.subscription-type` config prop to be respected. See https://github.com/spring-projects/spring-boot/issues/42053 --- .../reference/pulsar/message-consumption.adoc | 3 - ...eactivePulsarListenerContainerFactory.java | 7 +- .../annotation/ReactivePulsarListener.java | 2 +- ...vePulsarListenerContainerFactoryTests.java | 81 +++++++++ .../listener/ReactivePulsarListenerTests.java | 154 +++++++----------- .../pulsar/annotation/PulsarListener.java | 2 +- ...currentPulsarListenerContainerFactory.java | 7 + ...DefaultPulsarMessageListenerContainer.java | 8 +- ...ntPulsarMessageListenerContainerTests.java | 45 +++++ .../pulsar/listener/PulsarListenerTests.java | 128 ++++++--------- 10 files changed, 249 insertions(+), 188 deletions(-) create mode 100644 spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar/message-consumption.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar/message-consumption.adoc index 39daa6873..3546702aa 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar/message-consumption.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar/message-consumption.adoc @@ -15,9 +15,6 @@ Spring Boot provides this consumer factory which you can further configure by sp TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is instead generated when not specified on the annotation. -TIP: The `spring.pulsar.consumer.subscription.type` property is ignored and is instead taken from the value on the annotation. However, you can set the `subscriptionType = {}` on the annotation to instead use the property value as the default. - - Let us revisit the `PulsarListener` code snippet we saw in the quick-tour section: [source, java] diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java index aafe914a1..26b157f2d 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.springframework.core.log.LogAccessor; import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory; @@ -87,6 +88,7 @@ public DefaultReactivePulsarMessageListenerContainer createContainerInstance( ReactivePulsarContainerProperties properties = new ReactivePulsarContainerProperties<>(); properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver()); properties.setTopicResolver(this.getContainerProperties().getTopicResolver()); + properties.setSubscriptionType(this.getContainerProperties().getSubscriptionType()); if (!CollectionUtils.isEmpty(endpoint.getTopics())) { properties.setTopics(endpoint.getTopics()); @@ -103,8 +105,9 @@ public DefaultReactivePulsarMessageListenerContainer createContainerInstance( if (endpoint.getSubscriptionType() != null) { properties.setSubscriptionType(endpoint.getSubscriptionType()); } - else { - properties.setSubscriptionType(this.containerProperties.getSubscriptionType()); + // Default to Exclusive if not set on container props or endpoint + if (properties.getSubscriptionType() == null) { + properties.setSubscriptionType(SubscriptionType.Exclusive); } if (endpoint.getSchemaType() != null) { diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListener.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListener.java index 3ac6ef5e8..c334e26c8 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListener.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListener.java @@ -79,7 +79,7 @@ * @return single element array with the subscription type or empty array to indicate * no type chosen by user */ - SubscriptionType[] subscriptionType() default { SubscriptionType.Exclusive }; + SubscriptionType[] subscriptionType() default {}; /** * Pulsar schema type for this listener. diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java new file mode 100644 index 000000000..02971919a --- /dev/null +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java @@ -0,0 +1,81 @@ +/* + * Copyright 2023-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.reactive.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.pulsar.client.api.SubscriptionType; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory; +import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties; + +/** + * Unit tests for {@link DefaultReactivePulsarListenerContainerFactory}. + */ +class DefaultReactivePulsarListenerContainerFactoryTests { + + @SuppressWarnings("unchecked") + @Nested + class SubscriptionTypeFrom { + + @Test + void factoryPropsUsedWhenNotSetOnEndpoint() { + var factoryProps = new ReactivePulsarContainerProperties(); + factoryProps.setSubscriptionType(SubscriptionType.Shared); + var containerFactory = new DefaultReactivePulsarListenerContainerFactory( + mock(ReactivePulsarConsumerFactory.class), factoryProps); + var endpoint = mock(ReactivePulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionType()) + .isEqualTo(SubscriptionType.Shared); + } + + @Test + void endpointTakesPrecedenceOverFactoryProps() { + var factoryProps = new ReactivePulsarContainerProperties(); + factoryProps.setSubscriptionType(SubscriptionType.Shared); + var containerFactory = new DefaultReactivePulsarListenerContainerFactory( + mock(ReactivePulsarConsumerFactory.class), factoryProps); + var endpoint = mock(ReactivePulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + when(endpoint.getSubscriptionType()).thenReturn(SubscriptionType.Failover); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionType()) + .isEqualTo(SubscriptionType.Failover); + } + + @Test + void defaultUsedWhenNotSetOnEndpointNorFactoryProps() { + var factoryProps = new ReactivePulsarContainerProperties(); + var containerFactory = new DefaultReactivePulsarListenerContainerFactory( + mock(ReactivePulsarConsumerFactory.class), factoryProps); + var endpoint = mock(ReactivePulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionType()) + .isEqualTo(SubscriptionType.Exclusive); + + } + + } + +} diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java index c268e9cb6..e8469b7ee 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java @@ -72,8 +72,7 @@ import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.StreamingListenerTestCases.StreamingListenerTestCasesConfig; -import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig; -import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.SubscriptionTypeTestsConfig; import org.springframework.pulsar.reactive.support.MessageUtils; import org.springframework.pulsar.support.PulsarHeaders; import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper; @@ -816,117 +815,80 @@ Mono listen2(String message) { } @Nested + @ContextConfiguration(classes = SubscriptionTypeTestsConfig.class) class SubscriptionTypeTests { - @Nested - @ContextConfiguration(classes = WithDefaultTypeConfig.class) - class WithDefaultType { + static final CountDownLatch latchTypeNotSet = new CountDownLatch(1); - static final CountDownLatch latchTypeNotSet = new CountDownLatch(1); + static final CountDownLatch latchTypeSetOnAnnotation = new CountDownLatch(1); - @Test - void whenTypeNotSetAnywhereThenFallbackTypeIsUsed( - @Autowired ConsumerTrackingReactivePulsarConsumerFactory consumerFactory) throws Exception { - assertThat(consumerFactory.topicNameToConsumerSpec).hasEntrySatisfying("rpl-typeNotSetAnywhere-topic", - (consumerSpec) -> assertThat(consumerSpec.getSubscriptionType()) - .isEqualTo(SubscriptionType.Exclusive)); - pulsarTemplate.send("rpl-typeNotSetAnywhere-topic", "hello-rpl-typeNotSetAnywhere"); - assertThat(latchTypeNotSet.await(10, TimeUnit.SECONDS)).isTrue(); - } - - @Configuration(proxyBeanMethods = false) - static class WithDefaultTypeConfig { - - @ReactivePulsarListener(topics = "rpl-typeNotSetAnywhere-topic", - subscriptionName = "rpl-typeNotSetAnywhere-sub", - consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenWithoutTypeSetAnywhere(String ignored) { - latchTypeNotSet.countDown(); - return Mono.empty(); - } - - } + static final CountDownLatch latchTypeSetOnCustomizer = new CountDownLatch(1); + @Test + void defaultTypeFromContainerFactoryUsedWhenTypeNotSetAnywhere( + @Autowired ConsumerTrackingReactivePulsarConsumerFactory consumerFactory) throws Exception { + var topic = "rpl-latchTypeNotSet-topic"; + assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType) + .isEqualTo(SubscriptionType.Exclusive); + pulsarTemplate.send(topic, "hello-" + topic); + assertThat(latchTypeNotSet.await(5, TimeUnit.SECONDS)).isTrue(); } - @Nested - @ContextConfiguration(classes = WithSpecificTypesConfig.class) - class WithSpecificTypes { - - static final CountDownLatch latchTypeSetConsumerFactory = new CountDownLatch(1); + @Test + void typeSetOnAnnotationOverridesDefaultTypeFromContainerFactory( + @Autowired ConsumerTrackingReactivePulsarConsumerFactory consumerFactory) throws Exception { + var topic = "rpl-typeSetOnAnnotation-topic"; + assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType) + .isEqualTo(SubscriptionType.Key_Shared); + pulsarTemplate.send(topic, "hello-" + topic); + assertThat(latchTypeSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue(); + } - static final CountDownLatch latchTypeSetAnnotation = new CountDownLatch(1); + @Test + void typeSetOnCustomizerOverridesTypeSetOnAnnotation( + @Autowired ConsumerTrackingReactivePulsarConsumerFactory consumerFactory) throws Exception { + var topic = "rpl-typeSetOnCustomizer-topic"; + assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType) + .isEqualTo(SubscriptionType.Failover); + pulsarTemplate.send(topic, "hello-" + topic); + assertThat(latchTypeSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue(); + } - static final CountDownLatch latchWithCustomizer = new CountDownLatch(1); + @Configuration(proxyBeanMethods = false) + static class SubscriptionTypeTestsConfig { - @Test - void whenTypeSetOnlyInConsumerFactoryThenConsumerFactoryTypeIsUsed( - @Autowired ConsumerTrackingReactivePulsarConsumerFactory consumerFactory) throws Exception { - assertThat(consumerFactory.getSpec("rpl-typeSetConsumerFactory-topic")) - .extracting(ReactiveMessageConsumerSpec::getSubscriptionType) - .isEqualTo(SubscriptionType.Shared); - pulsarTemplate.send("rpl-typeSetConsumerFactory-topic", "hello-rpl-typeSetConsumerFactory"); - assertThat(latchTypeSetConsumerFactory.await(10, TimeUnit.SECONDS)).isTrue(); + @Bean + ReactiveMessageConsumerBuilderCustomizer consumerFactoryDefaultSubTypeCustomizer() { + return (b) -> b.subscriptionType(SubscriptionType.Shared); } - @Test - void whenTypeSetOnAnnotationThenAnnotationTypeIsUsed( - @Autowired ConsumerTrackingReactivePulsarConsumerFactory consumerFactory) throws Exception { - assertThat(consumerFactory.getSpec("rpl-typeSetAnnotation-topic")) - .extracting(ReactiveMessageConsumerSpec::getSubscriptionType) - .isEqualTo(SubscriptionType.Key_Shared); - pulsarTemplate.send("rpl-typeSetAnnotation-topic", "hello-rpl-typeSetAnnotation"); - assertThat(latchTypeSetAnnotation.await(10, TimeUnit.SECONDS)).isTrue(); + @ReactivePulsarListener(topics = "rpl-latchTypeNotSet-topic", subscriptionName = "rpl-latchTypeNotSet-sub", + consumerCustomizer = "subscriptionInitialPositionEarliest") + Mono listenWithoutTypeSetAnywhere(String ignored) { + latchTypeNotSet.countDown(); + return Mono.empty(); } - @Test - void whenTypeSetWithCustomizerThenCustomizerTypeIsUsed( - @Autowired ConsumerTrackingReactivePulsarConsumerFactory consumerFactory) throws Exception { - assertThat(consumerFactory.getSpec("rpl-typeSetCustomizer-topic")) - .extracting(ReactiveMessageConsumerSpec::getSubscriptionType) - .isEqualTo(SubscriptionType.Failover); - pulsarTemplate.send("rpl-typeSetCustomizer-topic", "hello-rpl-typeSetCustomizer"); - assertThat(latchWithCustomizer.await(10, TimeUnit.SECONDS)).isTrue(); + @ReactivePulsarListener(topics = "rpl-typeSetOnAnnotation-topic", + subscriptionName = "rpl-typeSetOnAnnotation-sub", subscriptionType = SubscriptionType.Key_Shared, + consumerCustomizer = "subscriptionInitialPositionEarliest") + Mono listenWithTypeSetOnAnnotation(String ignored) { + latchTypeSetOnAnnotation.countDown(); + return Mono.empty(); } - @Configuration(proxyBeanMethods = false) - static class WithSpecificTypesConfig { - - @Bean - ReactiveMessageConsumerBuilderCustomizer consumerFactoryDefaultSubTypeCustomizer() { - return (b) -> b.subscriptionType(SubscriptionType.Shared); - } - - @ReactivePulsarListener(topics = "rpl-typeSetConsumerFactory-topic", - subscriptionName = "rpl-typeSetConsumerFactory-sub", subscriptionType = {}, - consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenWithTypeSetOnlyOnConsumerFactory(String ignored) { - latchTypeSetConsumerFactory.countDown(); - return Mono.empty(); - } - - @ReactivePulsarListener(topics = "rpl-typeSetAnnotation-topic", - subscriptionName = "rpl-typeSetAnnotation-sub", subscriptionType = SubscriptionType.Key_Shared, - consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenWithTypeSetOnAnnotation(String ignored) { - latchTypeSetAnnotation.countDown(); - return Mono.empty(); - } - - @ReactivePulsarListener(topics = "rpl-typeSetCustomizer-topic", - subscriptionName = "rpl-typeSetCustomizer-sub", subscriptionType = SubscriptionType.Key_Shared, - consumerCustomizer = "myCustomizer") - Mono listenWithTypeSetInCustomizer(String ignored) { - latchWithCustomizer.countDown(); - return Mono.empty(); - } - - @Bean - public ReactivePulsarListenerMessageConsumerBuilderCustomizer myCustomizer() { - return cb -> cb.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscriptionType(SubscriptionType.Failover); - } + @ReactivePulsarListener(topics = "rpl-typeSetOnCustomizer-topic", + subscriptionName = "rpl-typeSetOnCustomizer-sub", subscriptionType = SubscriptionType.Key_Shared, + consumerCustomizer = "myCustomizer") + Mono listenWithTypeSetOnCustomizer(String ignored) { + latchTypeSetOnCustomizer.countDown(); + return Mono.empty(); + } + @Bean + public ReactivePulsarListenerMessageConsumerBuilderCustomizer myCustomizer() { + return cb -> cb.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionType(SubscriptionType.Failover); } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListener.java b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListener.java index bd69bf576..0f4ffa3e0 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListener.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListener.java @@ -81,7 +81,7 @@ * @return single element array with the subscription type or empty array to indicate * no type chosen by user */ - SubscriptionType[] subscriptionType() default { SubscriptionType.Exclusive }; + SubscriptionType[] subscriptionType() default {}; /** * Pulsar schema type for this listener. diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java index 5a947ea72..abe86a0d0 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java @@ -20,6 +20,8 @@ import java.util.Collection; import java.util.HashSet; +import org.apache.pulsar.client.api.SubscriptionType; + import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.listener.ConcurrentPulsarMessageListenerContainer; import org.springframework.pulsar.listener.PulsarContainerProperties; @@ -74,6 +76,7 @@ protected ConcurrentPulsarMessageListenerContainer createContainerInstance(Pu PulsarContainerProperties properties = new PulsarContainerProperties(); properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver()); properties.setTopicResolver(this.getContainerProperties().getTopicResolver()); + properties.setSubscriptionType(this.getContainerProperties().getSubscriptionType()); var parentTxnProps = this.getContainerProperties().transactions(); var childTxnProps = properties.transactions(); @@ -102,6 +105,10 @@ protected ConcurrentPulsarMessageListenerContainer createContainerInstance(Pu if (endpoint.getSubscriptionType() != null) { properties.setSubscriptionType(endpoint.getSubscriptionType()); } + // Default to Exclusive if not set on container props or endpoint + if (properties.getSubscriptionType() == null) { + properties.setSubscriptionType(SubscriptionType.Exclusive); + } properties.setSchemaType(endpoint.getSchemaType()); diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java index cb3dc2d81..18ea5c4fc 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java @@ -304,11 +304,9 @@ else if (messageListener != null) { topicNames, this.containerProperties.getSubscriptionName(), properties, customizers); Assert.state(this.consumer != null, "Unable to create a consumer"); - // If subtype is null - update it based on the actual subtype of the - // underlying consumer - if (this.subscriptionType == null) { - updateSubscriptionTypeFromConsumer(this.consumer); - } + // Update sub type from underlying consumer as customizer from annotation + // may have updated it + updateSubscriptionTypeFromConsumer(this.consumer); } catch (PulsarException e) { DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> "Pulsar exception."); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java index e1f9d7ca9..1967efc75 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java @@ -212,6 +212,51 @@ private record PulsarListenerMockComponents(PulsarConsumerFactory consum Consumer consumer, ConcurrentPulsarMessageListenerContainer concurrentContainer) { } + @SuppressWarnings("unchecked") + @Nested + class SubscriptionTypeFrom { + + @Test + void factoryPropsUsedWhenNotSetOnEndpoint() { + var factoryProps = new PulsarContainerProperties(); + factoryProps.setSubscriptionType(SubscriptionType.Shared); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionType()) + .isEqualTo(SubscriptionType.Shared); + } + + @Test + void endpointTakesPrecedenceOverFactoryProps() { + var factoryProps = new PulsarContainerProperties(); + factoryProps.setSubscriptionType(SubscriptionType.Shared); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + when(endpoint.getSubscriptionType()).thenReturn(SubscriptionType.Failover); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionType()) + .isEqualTo(SubscriptionType.Failover); + } + + @Test + void defaultUsedWhenNotSetOnEndpointNorFactoryProps() { + var factoryProps = new PulsarContainerProperties(); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionType()) + .isEqualTo(SubscriptionType.Exclusive); + } + + } + @Nested class ObservationConfigurationTests { diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java index e8ae9e411..8f15f855e 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java @@ -73,8 +73,7 @@ import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.listener.PulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig; -import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig; -import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig; +import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionTypeTests.SubscriptionTypeTestsConfig; import org.springframework.pulsar.support.PulsarHeaders; import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper; import org.springframework.pulsar.test.model.UserPojo; @@ -1055,102 +1054,71 @@ void listen(String msg) { } @Nested + @ContextConfiguration(classes = SubscriptionTypeTestsConfig.class) class SubscriptionTypeTests { - @SuppressWarnings("rawtypes") - private static AbstractObjectAssert assertSubscriptionType(Consumer consumer) { - return assertThat(consumer) - .extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class)) - .extracting(ConsumerConfigurationData::getSubscriptionType); - } - - @Nested - @ContextConfiguration(classes = WithDefaultTypeConfig.class) - class WithDefaultType { + static final CountDownLatch latchTypeNotSet = new CountDownLatch(1); - static final CountDownLatch latchTypeNotSet = new CountDownLatch(1); - - @Test - void whenTypeNotSetAnywhereThenFallbackTypeIsUsed() throws Exception { - pulsarTemplate.send("typeNotSetAnywhere-topic", "hello-typeNotSetAnywhere"); - assertThat(latchTypeNotSet.await(5, TimeUnit.SECONDS)).isTrue(); - } - - @Configuration(proxyBeanMethods = false) - static class WithDefaultTypeConfig { - - @PulsarListener(topics = "typeNotSetAnywhere-topic", subscriptionName = "typeNotSetAnywhere-sub") - void listenWithoutTypeSetAnywhere(String ignored, Consumer consumer) { - assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Exclusive); - latchTypeNotSet.countDown(); - } + static final CountDownLatch latchTypeSetOnAnnotation = new CountDownLatch(1); - } + static final CountDownLatch latchTypeSetOnCustomizer = new CountDownLatch(1); + @Test + void defaultTypeFromContainerFactoryUsedWhenTypeNotSetAnywhere() throws Exception { + pulsarTemplate.send("latchTypeNotSet-topic", "hello-latchTypeNotSet"); + assertThat(latchTypeNotSet.await(5, TimeUnit.SECONDS)).isTrue(); } - @Nested - @ContextConfiguration(classes = WithSpecificTypesConfig.class) - class WithSpecificTypes { - - static final CountDownLatch latchTypeSetConsumerFactory = new CountDownLatch(1); + @Test + void typeSetOnAnnotationOverridesDefaultTypeFromContainerFactory() throws Exception { + pulsarTemplate.send("typeSetOnAnnotation-topic", "hello-typeSetOnAnnotation"); + assertThat(latchTypeSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue(); + } - static final CountDownLatch latchTypeSetAnnotation = new CountDownLatch(1); + @Test + void typeSetOnCustomizerOverridesTypeSetOnAnnotation() throws Exception { + pulsarTemplate.send("typeSetOnCustomizer-topic", "hello-typeSetOnCustomizer"); + assertThat(latchTypeSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue(); + } - static final CountDownLatch latchWithCustomizer = new CountDownLatch(1); + @Configuration(proxyBeanMethods = false) + static class SubscriptionTypeTestsConfig { - @Test - void whenTypeSetOnlyInConsumerFactoryThenConsumerFactoryTypeIsUsed() throws Exception { - pulsarTemplate.send("typeSetConsumerFactory-topic", "hello-typeSetConsumerFactory"); - assertThat(latchTypeSetConsumerFactory.await(5, TimeUnit.SECONDS)).isTrue(); + @Bean + ConsumerBuilderCustomizer consumerFactoryCustomizerSubTypeIsIgnored() { + return (b) -> b.subscriptionType(SubscriptionType.Shared); } - @Test - void whenTypeSetOnAnnotationThenAnnotationTypeIsUsed() throws Exception { - pulsarTemplate.send("typeSetAnnotation-topic", "hello-typeSetAnnotation"); - assertThat(latchTypeSetAnnotation.await(5, TimeUnit.SECONDS)).isTrue(); + @PulsarListener(topics = "latchTypeNotSet-topic", subscriptionName = "latchTypeNotSet-sub") + void listenWithTypeNotSet(String ignored, Consumer consumer) { + assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Exclusive); + latchTypeNotSet.countDown(); } - @Test - void whenTypeSetWithCustomizerThenCustomizerTypeIsUsed() throws Exception { - pulsarTemplate.send("typeSetCustomizer-topic", "hello-typeSetCustomizer"); - assertThat(latchWithCustomizer.await(5, TimeUnit.SECONDS)).isTrue(); + @PulsarListener(topics = "typeSetOnAnnotation-topic", subscriptionName = "typeSetOnAnnotation-sub", + subscriptionType = SubscriptionType.Key_Shared) + void listenWithTypeSetOnAnnotation(String ignored, Consumer consumer) { + assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Key_Shared); + latchTypeSetOnAnnotation.countDown(); } - @Configuration(proxyBeanMethods = false) - static class WithSpecificTypesConfig { - - @Bean - ConsumerBuilderCustomizer consumerFactoryDefaultSubTypeCustomizer() { - return (b) -> b.subscriptionType(SubscriptionType.Shared); - } - - @PulsarListener(topics = "typeSetConsumerFactory-topic", - subscriptionName = "typeSetConsumerFactory-sub", subscriptionType = {}) - void listenWithTypeSetOnlyOnConsumerFactory(String ignored, Consumer consumer) { - assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Shared); - latchTypeSetConsumerFactory.countDown(); - } - - @PulsarListener(topics = "typeSetAnnotation-topic", subscriptionName = "typeSetAnnotation-sub", - subscriptionType = SubscriptionType.Key_Shared) - void listenWithTypeSetOnAnnotation(String ignored, Consumer consumer) { - assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Key_Shared); - latchTypeSetAnnotation.countDown(); - } - - @PulsarListener(topics = "typeSetCustomizer-topic", subscriptionName = "typeSetCustomizer-sub", - subscriptionType = SubscriptionType.Key_Shared, consumerCustomizer = "myCustomizer") - void listenWithTypeSetInCustomizer(String ignored, Consumer consumer) { - assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Failover); - latchWithCustomizer.countDown(); - } + @PulsarListener(topics = "typeSetOnCustomizer-topic", subscriptionName = "typeSetOnCustomizer-sub", + subscriptionType = SubscriptionType.Key_Shared, consumerCustomizer = "myCustomizer") + void listenWithTypeSetOnCustomizer(String ignored, Consumer consumer) { + assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Failover); + latchTypeSetOnCustomizer.countDown(); + } - @Bean - public PulsarListenerConsumerBuilderCustomizer myCustomizer() { - return cb -> cb.subscriptionType(SubscriptionType.Failover); - } + @Bean + public PulsarListenerConsumerBuilderCustomizer myCustomizer() { + return cb -> cb.subscriptionType(SubscriptionType.Failover); + } + @SuppressWarnings("rawtypes") + private static AbstractObjectAssert assertSubscriptionType(Consumer consumer) { + return assertThat(consumer) + .extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class)) + .extracting(ConsumerConfigurationData::getSubscriptionType); } }