Skip to content

Commit

Permalink
Move default subscription type to factory (#818)
Browse files Browse the repository at this point in the history
This commit moves the default subscription type from the
`@PulsarListener` and `@ReactivePulsarListener` annotation to the
associated container factory (props) which allows the Spring Boot
`spring.pulsar.consumer.subscription-type` config prop to be respected.

See spring-projects/spring-boot#42053
  • Loading branch information
onobc authored Aug 30, 2024
1 parent 01a95a7 commit 5946c95
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ Spring Boot provides this consumer factory which you can further configure by sp

TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is instead generated when not specified on the annotation.

TIP: The `spring.pulsar.consumer.subscription.type` property is ignored and is instead taken from the value on the annotation. However, you can set the `subscriptionType = {}` on the annotation to instead use the property value as the default.


Let us revisit the `PulsarListener` code snippet we saw in the quick-tour section:

[source, java]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;

import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
Expand Down Expand Up @@ -87,6 +88,7 @@ public DefaultReactivePulsarMessageListenerContainer<T> createContainerInstance(
ReactivePulsarContainerProperties<T> properties = new ReactivePulsarContainerProperties<>();
properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver());
properties.setTopicResolver(this.getContainerProperties().getTopicResolver());
properties.setSubscriptionType(this.getContainerProperties().getSubscriptionType());

if (!CollectionUtils.isEmpty(endpoint.getTopics())) {
properties.setTopics(endpoint.getTopics());
Expand All @@ -103,8 +105,9 @@ public DefaultReactivePulsarMessageListenerContainer<T> createContainerInstance(
if (endpoint.getSubscriptionType() != null) {
properties.setSubscriptionType(endpoint.getSubscriptionType());
}
else {
properties.setSubscriptionType(this.containerProperties.getSubscriptionType());
// Default to Exclusive if not set on container props or endpoint
if (properties.getSubscriptionType() == null) {
properties.setSubscriptionType(SubscriptionType.Exclusive);
}

if (endpoint.getSchemaType() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
* @return single element array with the subscription type or empty array to indicate
* no type chosen by user
*/
SubscriptionType[] subscriptionType() default { SubscriptionType.Exclusive };
SubscriptionType[] subscriptionType() default {};

/**
* Pulsar schema type for this listener.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.reactive.config;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import org.apache.pulsar.client.api.SubscriptionType;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties;

/**
* Unit tests for {@link DefaultReactivePulsarListenerContainerFactory}.
*/
class DefaultReactivePulsarListenerContainerFactoryTests {

@SuppressWarnings("unchecked")
@Nested
class SubscriptionTypeFrom {

@Test
void factoryPropsUsedWhenNotSetOnEndpoint() {
var factoryProps = new ReactivePulsarContainerProperties<String>();
factoryProps.setSubscriptionType(SubscriptionType.Shared);
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
mock(ReactivePulsarConsumerFactory.class), factoryProps);
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
var createdContainer = containerFactory.createListenerContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionType())
.isEqualTo(SubscriptionType.Shared);
}

@Test
void endpointTakesPrecedenceOverFactoryProps() {
var factoryProps = new ReactivePulsarContainerProperties<String>();
factoryProps.setSubscriptionType(SubscriptionType.Shared);
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
mock(ReactivePulsarConsumerFactory.class), factoryProps);
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
when(endpoint.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
var createdContainer = containerFactory.createListenerContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionType())
.isEqualTo(SubscriptionType.Failover);
}

@Test
void defaultUsedWhenNotSetOnEndpointNorFactoryProps() {
var factoryProps = new ReactivePulsarContainerProperties<String>();
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
mock(ReactivePulsarConsumerFactory.class), factoryProps);
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
var createdContainer = containerFactory.createListenerContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionType())
.isEqualTo(SubscriptionType.Exclusive);

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.StreamingListenerTestCases.StreamingListenerTestCasesConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.SubscriptionTypeTestsConfig;
import org.springframework.pulsar.reactive.support.MessageUtils;
import org.springframework.pulsar.support.PulsarHeaders;
import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper;
Expand Down Expand Up @@ -816,117 +815,80 @@ Mono<Void> listen2(String message) {
}

@Nested
@ContextConfiguration(classes = SubscriptionTypeTestsConfig.class)
class SubscriptionTypeTests {

@Nested
@ContextConfiguration(classes = WithDefaultTypeConfig.class)
class WithDefaultType {
static final CountDownLatch latchTypeNotSet = new CountDownLatch(1);

static final CountDownLatch latchTypeNotSet = new CountDownLatch(1);
static final CountDownLatch latchTypeSetOnAnnotation = new CountDownLatch(1);

@Test
void whenTypeNotSetAnywhereThenFallbackTypeIsUsed(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
assertThat(consumerFactory.topicNameToConsumerSpec).hasEntrySatisfying("rpl-typeNotSetAnywhere-topic",
(consumerSpec) -> assertThat(consumerSpec.getSubscriptionType())
.isEqualTo(SubscriptionType.Exclusive));
pulsarTemplate.send("rpl-typeNotSetAnywhere-topic", "hello-rpl-typeNotSetAnywhere");
assertThat(latchTypeNotSet.await(10, TimeUnit.SECONDS)).isTrue();
}

@Configuration(proxyBeanMethods = false)
static class WithDefaultTypeConfig {

@ReactivePulsarListener(topics = "rpl-typeNotSetAnywhere-topic",
subscriptionName = "rpl-typeNotSetAnywhere-sub",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenWithoutTypeSetAnywhere(String ignored) {
latchTypeNotSet.countDown();
return Mono.empty();
}

}
static final CountDownLatch latchTypeSetOnCustomizer = new CountDownLatch(1);

@Test
void defaultTypeFromContainerFactoryUsedWhenTypeNotSetAnywhere(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
var topic = "rpl-latchTypeNotSet-topic";
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Exclusive);
pulsarTemplate.send(topic, "hello-" + topic);
assertThat(latchTypeNotSet.await(5, TimeUnit.SECONDS)).isTrue();
}

@Nested
@ContextConfiguration(classes = WithSpecificTypesConfig.class)
class WithSpecificTypes {

static final CountDownLatch latchTypeSetConsumerFactory = new CountDownLatch(1);
@Test
void typeSetOnAnnotationOverridesDefaultTypeFromContainerFactory(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
var topic = "rpl-typeSetOnAnnotation-topic";
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Key_Shared);
pulsarTemplate.send(topic, "hello-" + topic);
assertThat(latchTypeSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue();
}

static final CountDownLatch latchTypeSetAnnotation = new CountDownLatch(1);
@Test
void typeSetOnCustomizerOverridesTypeSetOnAnnotation(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
var topic = "rpl-typeSetOnCustomizer-topic";
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Failover);
pulsarTemplate.send(topic, "hello-" + topic);
assertThat(latchTypeSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue();
}

static final CountDownLatch latchWithCustomizer = new CountDownLatch(1);
@Configuration(proxyBeanMethods = false)
static class SubscriptionTypeTestsConfig {

@Test
void whenTypeSetOnlyInConsumerFactoryThenConsumerFactoryTypeIsUsed(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
assertThat(consumerFactory.getSpec("rpl-typeSetConsumerFactory-topic"))
.extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Shared);
pulsarTemplate.send("rpl-typeSetConsumerFactory-topic", "hello-rpl-typeSetConsumerFactory");
assertThat(latchTypeSetConsumerFactory.await(10, TimeUnit.SECONDS)).isTrue();
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerFactoryDefaultSubTypeCustomizer() {
return (b) -> b.subscriptionType(SubscriptionType.Shared);
}

@Test
void whenTypeSetOnAnnotationThenAnnotationTypeIsUsed(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
assertThat(consumerFactory.getSpec("rpl-typeSetAnnotation-topic"))
.extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Key_Shared);
pulsarTemplate.send("rpl-typeSetAnnotation-topic", "hello-rpl-typeSetAnnotation");
assertThat(latchTypeSetAnnotation.await(10, TimeUnit.SECONDS)).isTrue();
@ReactivePulsarListener(topics = "rpl-latchTypeNotSet-topic", subscriptionName = "rpl-latchTypeNotSet-sub",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenWithoutTypeSetAnywhere(String ignored) {
latchTypeNotSet.countDown();
return Mono.empty();
}

@Test
void whenTypeSetWithCustomizerThenCustomizerTypeIsUsed(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
assertThat(consumerFactory.getSpec("rpl-typeSetCustomizer-topic"))
.extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Failover);
pulsarTemplate.send("rpl-typeSetCustomizer-topic", "hello-rpl-typeSetCustomizer");
assertThat(latchWithCustomizer.await(10, TimeUnit.SECONDS)).isTrue();
@ReactivePulsarListener(topics = "rpl-typeSetOnAnnotation-topic",
subscriptionName = "rpl-typeSetOnAnnotation-sub", subscriptionType = SubscriptionType.Key_Shared,
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenWithTypeSetOnAnnotation(String ignored) {
latchTypeSetOnAnnotation.countDown();
return Mono.empty();
}

@Configuration(proxyBeanMethods = false)
static class WithSpecificTypesConfig {

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerFactoryDefaultSubTypeCustomizer() {
return (b) -> b.subscriptionType(SubscriptionType.Shared);
}

@ReactivePulsarListener(topics = "rpl-typeSetConsumerFactory-topic",
subscriptionName = "rpl-typeSetConsumerFactory-sub", subscriptionType = {},
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenWithTypeSetOnlyOnConsumerFactory(String ignored) {
latchTypeSetConsumerFactory.countDown();
return Mono.empty();
}

@ReactivePulsarListener(topics = "rpl-typeSetAnnotation-topic",
subscriptionName = "rpl-typeSetAnnotation-sub", subscriptionType = SubscriptionType.Key_Shared,
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenWithTypeSetOnAnnotation(String ignored) {
latchTypeSetAnnotation.countDown();
return Mono.empty();
}

@ReactivePulsarListener(topics = "rpl-typeSetCustomizer-topic",
subscriptionName = "rpl-typeSetCustomizer-sub", subscriptionType = SubscriptionType.Key_Shared,
consumerCustomizer = "myCustomizer")
Mono<Void> listenWithTypeSetInCustomizer(String ignored) {
latchWithCustomizer.countDown();
return Mono.empty();
}

@Bean
public ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> cb.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Failover);
}
@ReactivePulsarListener(topics = "rpl-typeSetOnCustomizer-topic",
subscriptionName = "rpl-typeSetOnCustomizer-sub", subscriptionType = SubscriptionType.Key_Shared,
consumerCustomizer = "myCustomizer")
Mono<Void> listenWithTypeSetOnCustomizer(String ignored) {
latchTypeSetOnCustomizer.countDown();
return Mono.empty();
}

@Bean
public ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> cb.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Failover);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
* @return single element array with the subscription type or empty array to indicate
* no type chosen by user
*/
SubscriptionType[] subscriptionType() default { SubscriptionType.Exclusive };
SubscriptionType[] subscriptionType() default {};

/**
* Pulsar schema type for this listener.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Collection;
import java.util.HashSet;

import org.apache.pulsar.client.api.SubscriptionType;

import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.listener.ConcurrentPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.PulsarContainerProperties;
Expand Down Expand Up @@ -74,6 +76,7 @@ protected ConcurrentPulsarMessageListenerContainer<T> createContainerInstance(Pu
PulsarContainerProperties properties = new PulsarContainerProperties();
properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver());
properties.setTopicResolver(this.getContainerProperties().getTopicResolver());
properties.setSubscriptionType(this.getContainerProperties().getSubscriptionType());

var parentTxnProps = this.getContainerProperties().transactions();
var childTxnProps = properties.transactions();
Expand Down Expand Up @@ -102,6 +105,10 @@ protected ConcurrentPulsarMessageListenerContainer<T> createContainerInstance(Pu
if (endpoint.getSubscriptionType() != null) {
properties.setSubscriptionType(endpoint.getSubscriptionType());
}
// Default to Exclusive if not set on container props or endpoint
if (properties.getSubscriptionType() == null) {
properties.setSubscriptionType(SubscriptionType.Exclusive);
}

properties.setSchemaType(endpoint.getSchemaType());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,9 @@ else if (messageListener != null) {
topicNames, this.containerProperties.getSubscriptionName(), properties, customizers);
Assert.state(this.consumer != null, "Unable to create a consumer");

// If subtype is null - update it based on the actual subtype of the
// underlying consumer
if (this.subscriptionType == null) {
updateSubscriptionTypeFromConsumer(this.consumer);
}
// Update sub type from underlying consumer as customizer from annotation
// may have updated it
updateSubscriptionTypeFromConsumer(this.consumer);
}
catch (PulsarException e) {
DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> "Pulsar exception.");
Expand Down
Loading

0 comments on commit 5946c95

Please sign in to comment.