Skip to content

Commit

Permalink
Move default subscription name to factory (#821)
Browse files Browse the repository at this point in the history
This commit moves the default subscription name from the
`@PulsarListener` and `@ReactivePulsarListener` annotation to the
corresponding container factory (props) which allows the
`spring.pulsar.consumer.subscription.name` config prop to be respected.

See spring-projects/spring-boot#42053
  • Loading branch information
onobc authored Aug 31, 2024
1 parent 5946c95 commit 136d465
Show file tree
Hide file tree
Showing 12 changed files with 391 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ When you use Spring Boot support, it automatically enables this annotation and c
`PulsarMessageListenerContainer` uses a `PulsarConsumerFactory` to create and manage the Pulsar consumer the underlying Pulsar consumer that it uses to consume messages.

Spring Boot provides this consumer factory which you can further configure by specifying the {spring-boot-pulsar-config-props}[`spring.pulsar.consumer.*`] application properties.
**Most** of the configured properties on the factory will be respected in the listener with the following **exceptions**:

TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is instead generated when not specified on the annotation.

Let us revisit the `PulsarListener` code snippet we saw in the quick-tour section:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ NOTE: There is no support for using `org.apache.pulsar.client.api.Messages<T>` i
=== Configuration - Application Properties
The listener relies on the `ReactivePulsarConsumerFactory` to create and manage the underlying Pulsar consumer that it uses to consume messages.
Spring Boot provides this consumer factory which you can further configure by specifying the {spring-boot-pulsar-config-props}[`spring.pulsar.consumer.*`] application properties.
**Most** of the configured properties on the factory will be respected in the listener with the following **exceptions**:

TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is instead generated when not specified on the annotation.

TIP: The `spring.pulsar.consumer.subscription.type` property is ignored and is instead taken from the value on the annotation. However, you can set the `subscriptionType = {}` on the annotation to instead use the property value as the default.

=== Generic records with AUTO_CONSUME
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the `AUTO_CONSUME` schema type to consume generic records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
Expand All @@ -39,6 +40,10 @@
*/
public class DefaultReactivePulsarListenerContainerFactory<T> implements ReactivePulsarListenerContainerFactory<T> {

private static final String SUBSCRIPTION_NAME_PREFIX = "org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#";

private static final AtomicInteger COUNTER = new AtomicInteger();

protected final LogAccessor logger = new LogAccessor(this.getClass());

private final ReactivePulsarConsumerFactory<T> consumerFactory;
Expand Down Expand Up @@ -84,58 +89,54 @@ public void setFluxListener(Boolean fluxListener) {
@SuppressWarnings("unchecked")
public DefaultReactivePulsarMessageListenerContainer<T> createContainerInstance(
ReactivePulsarListenerEndpoint<T> endpoint) {

ReactivePulsarContainerProperties<T> properties = new ReactivePulsarContainerProperties<>();
properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver());
properties.setTopicResolver(this.getContainerProperties().getTopicResolver());
properties.setSubscriptionType(this.getContainerProperties().getSubscriptionType());

var containerProps = new ReactivePulsarContainerProperties<T>();
var factoryProps = this.getContainerProperties();

// Map factory props (defaults) to the container props
containerProps.setSchemaResolver(factoryProps.getSchemaResolver());
containerProps.setTopicResolver(factoryProps.getTopicResolver());
containerProps.setSubscriptionType(factoryProps.getSubscriptionType());
containerProps.setSubscriptionName(factoryProps.getSubscriptionName());
containerProps.setSchemaType(factoryProps.getSchemaType());
containerProps.setConcurrency(factoryProps.getConcurrency());
containerProps.setUseKeyOrderedProcessing(factoryProps.isUseKeyOrderedProcessing());

// Map relevant props from the endpoint to the container props
if (!CollectionUtils.isEmpty(endpoint.getTopics())) {
properties.setTopics(endpoint.getTopics());
containerProps.setTopics(endpoint.getTopics());
}

if (StringUtils.hasText(endpoint.getTopicPattern())) {
properties.setTopicsPattern(endpoint.getTopicPattern());
containerProps.setTopicsPattern(endpoint.getTopicPattern());
}

if (StringUtils.hasText(endpoint.getSubscriptionName())) {
properties.setSubscriptionName(endpoint.getSubscriptionName());
}

if (endpoint.getSubscriptionType() != null) {
properties.setSubscriptionType(endpoint.getSubscriptionType());
containerProps.setSubscriptionType(endpoint.getSubscriptionType());
}
// Default to Exclusive if not set on container props or endpoint
if (properties.getSubscriptionType() == null) {
properties.setSubscriptionType(SubscriptionType.Exclusive);
// Default subscription type to Exclusive when not set elsewhere
if (containerProps.getSubscriptionType() == null) {
containerProps.setSubscriptionType(SubscriptionType.Exclusive);
}

if (endpoint.getSchemaType() != null) {
properties.setSchemaType(endpoint.getSchemaType());
if (StringUtils.hasText(endpoint.getSubscriptionName())) {
containerProps.setSubscriptionName(endpoint.getSubscriptionName());
}
else {
properties.setSchemaType(this.containerProperties.getSchemaType());
// Default subscription name to generated when not set elsewhere
if (!StringUtils.hasText(containerProps.getSubscriptionName())) {
var generatedName = SUBSCRIPTION_NAME_PREFIX + COUNTER.getAndIncrement();
containerProps.setSubscriptionName(generatedName);
}

if (properties.getSchema() == null) {
properties.setSchema((Schema<T>) Schema.BYTES);
if (endpoint.getSchemaType() != null) {
containerProps.setSchemaType(endpoint.getSchemaType());
}

if (endpoint.getConcurrency() != null) {
properties.setConcurrency(endpoint.getConcurrency());
// Default to BYTES if not set elsewhere
if (containerProps.getSchema() == null) {
containerProps.setSchema((Schema<T>) Schema.BYTES);
}
else {
properties.setConcurrency(this.containerProperties.getConcurrency());
if (endpoint.getConcurrency() != null) {
containerProps.setConcurrency(endpoint.getConcurrency());
}

if (endpoint.getUseKeyOrderedProcessing() != null) {
properties.setUseKeyOrderedProcessing(endpoint.getUseKeyOrderedProcessing());
containerProps.setUseKeyOrderedProcessing(endpoint.getUseKeyOrderedProcessing());
}
else {
properties.setUseKeyOrderedProcessing(this.containerProperties.isUseKeyOrderedProcessing());
}

return new DefaultReactivePulsarMessageListenerContainer<>(this.getConsumerFactory(), properties);
return new DefaultReactivePulsarMessageListenerContainer<>(this.getConsumerFactory(), containerProps);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,11 @@ private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListene
ReactivePulsarListener reactivePulsarListener, Object bean, String[] topics, String topicPattern) {
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setSubscriptionName(getEndpointSubscriptionName(reactivePulsarListener));
endpoint.setId(getEndpointId(reactivePulsarListener));
endpoint.setTopics(topics);
endpoint.setTopicPattern(topicPattern);
resolveSubscriptionType(endpoint, reactivePulsarListener);
resolveSubscriptionName(endpoint, reactivePulsarListener);
endpoint.setSchemaType(reactivePulsarListener.schemaType());
String concurrency = reactivePulsarListener.concurrency();
if (StringUtils.hasText(concurrency)) {
Expand All @@ -257,11 +257,18 @@ private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListene
}

private void resolveSubscriptionType(MethodReactivePulsarListenerEndpoint<?> endpoint,
ReactivePulsarListener reactivePulsarListener) {
Assert.state(reactivePulsarListener.subscriptionType().length <= 1,
ReactivePulsarListener listener) {
Assert.state(listener.subscriptionType().length <= 1,
() -> "ReactivePulsarListener.subscriptionType must have 0 or 1 elements");
if (reactivePulsarListener.subscriptionType().length == 1) {
endpoint.setSubscriptionType(reactivePulsarListener.subscriptionType()[0]);
if (listener.subscriptionType().length == 1) {
endpoint.setSubscriptionType(listener.subscriptionType()[0]);
}
}

private void resolveSubscriptionName(MethodReactivePulsarListenerEndpoint<?> endpoint,
ReactivePulsarListener listener) {
if (StringUtils.hasText(listener.subscriptionName())) {
endpoint.setSubscriptionName(resolveExpressionAsString(listener.subscriptionName(), "subscriptionName"));
}
}

Expand Down Expand Up @@ -322,13 +329,6 @@ private void resolveConsumerCustomizer(MethodReactivePulsarListenerEndpoint<?> e
}
}

private String getEndpointSubscriptionName(ReactivePulsarListener reactivePulsarListener) {
if (StringUtils.hasText(reactivePulsarListener.subscriptionName())) {
return resolveExpressionAsString(reactivePulsarListener.subscriptionName(), "subscriptionName");
}
return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
}

private String getEndpointId(ReactivePulsarListener reactivePulsarListener) {
if (StringUtils.hasText(reactivePulsarListener.id())) {
return resolveExpressionAsString(reactivePulsarListener.id(), "id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,55 @@ void defaultUsedWhenNotSetOnEndpointNorFactoryProps() {

}

@SuppressWarnings("unchecked")
@Nested
class SubscriptionNameFrom {

@Test
void factoryPropsUsedWhenNotSetOnEndpoint() {
var factoryProps = new ReactivePulsarContainerProperties<String>();
factoryProps.setSubscriptionName("my-factory-subscription");
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
mock(ReactivePulsarConsumerFactory.class), factoryProps);
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
var createdContainer = containerFactory.createListenerContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionName())
.isEqualTo("my-factory-subscription");
}

@Test
void endpointTakesPrecedenceOverFactoryProps() {
var factoryProps = new ReactivePulsarContainerProperties<String>();
factoryProps.setSubscriptionName("my-factory-subscription");
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
mock(ReactivePulsarConsumerFactory.class), factoryProps);
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
when(endpoint.getSubscriptionName()).thenReturn("my-endpoint-subscription");
var createdContainer = containerFactory.createListenerContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionName())
.isEqualTo("my-endpoint-subscription");
}

@Test
void defaultUsedWhenNotSetOnEndpointNorFactoryProps() {
var factoryProps = new ReactivePulsarContainerProperties<String>();
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
mock(ReactivePulsarConsumerFactory.class), factoryProps);
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);

var container1 = containerFactory.createListenerContainer(endpoint);
assertThat(container1.getContainerProperties().getSubscriptionName())
.startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#");
var container2 = containerFactory.createListenerContainer(endpoint);
assertThat(container2.getContainerProperties().getSubscriptionName())
.startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#");
assertThat(container1.getContainerProperties().getSubscriptionName())
.isNotEqualTo(container2.getContainerProperties().getSubscriptionName());
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.reactive.client.api.MessageResult;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -72,7 +73,7 @@
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.StreamingListenerTestCases.StreamingListenerTestCasesConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.SubscriptionTypeTestsConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionNameTests.SubscriptionNameTestsConfig;
import org.springframework.pulsar.reactive.support.MessageUtils;
import org.springframework.pulsar.support.PulsarHeaders;
import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper;
Expand Down Expand Up @@ -815,80 +816,79 @@ Mono<Void> listen2(String message) {
}

@Nested
@ContextConfiguration(classes = SubscriptionTypeTestsConfig.class)
class SubscriptionTypeTests {
@ContextConfiguration(classes = SubscriptionNameTestsConfig.class)
class SubscriptionNameTests {

static final CountDownLatch latchTypeNotSet = new CountDownLatch(1);
static final CountDownLatch latchNameNotSet = new CountDownLatch(1);

static final CountDownLatch latchTypeSetOnAnnotation = new CountDownLatch(1);
static final CountDownLatch latchNameSetOnAnnotation = new CountDownLatch(1);

static final CountDownLatch latchTypeSetOnCustomizer = new CountDownLatch(1);
static final CountDownLatch latchNameSetOnCustomizer = new CountDownLatch(1);

@Test
void defaultTypeFromContainerFactoryUsedWhenTypeNotSetAnywhere(
void defaultNameFromContainerFactoryUsedWhenNameNotSetAnywhere(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
var topic = "rpl-latchTypeNotSet-topic";
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Exclusive);
var topic = "rpl-latchNameNotSet-topic";
assertThat(consumerFactory.getSpec(topic))
.extracting(ReactiveMessageConsumerSpec::getSubscriptionName, InstanceOfAssertFactories.STRING)
.startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#");
pulsarTemplate.send(topic, "hello-" + topic);
assertThat(latchTypeNotSet.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(latchNameNotSet.await(5, TimeUnit.SECONDS)).isTrue();
}

@Test
void typeSetOnAnnotationOverridesDefaultTypeFromContainerFactory(
void nameSetOnAnnotationOverridesDefaultNameFromContainerFactory(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
var topic = "rpl-typeSetOnAnnotation-topic";
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Key_Shared);
var topic = "rpl-nameSetOnAnnotation-topic";
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionName)
.isEqualTo("from-annotation");
pulsarTemplate.send(topic, "hello-" + topic);
assertThat(latchTypeSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(latchNameSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue();
}

@Test
void typeSetOnCustomizerOverridesTypeSetOnAnnotation(
void nameSetOnCustomizerOverridesNameSetOnAnnotation(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
var topic = "rpl-typeSetOnCustomizer-topic";
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Failover);
var topic = "rpl-nameSetOnCustomizer-topic";
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionName)
.isEqualTo("from-customizer");
pulsarTemplate.send(topic, "hello-" + topic);
assertThat(latchTypeSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(latchNameSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue();
}

@Configuration(proxyBeanMethods = false)
static class SubscriptionTypeTestsConfig {
static class SubscriptionNameTestsConfig {

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerFactoryDefaultSubTypeCustomizer() {
return (b) -> b.subscriptionType(SubscriptionType.Shared);
ReactiveMessageConsumerBuilderCustomizer<String> consumerFactoryDefaultSubNameCustomizer() {
return (b) -> b.subscriptionName("from-consumer-factory");
}

@ReactivePulsarListener(topics = "rpl-latchTypeNotSet-topic", subscriptionName = "rpl-latchTypeNotSet-sub",
@ReactivePulsarListener(topics = "rpl-latchNameNotSet-topic",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenWithoutTypeSetAnywhere(String ignored) {
latchTypeNotSet.countDown();
Mono<Void> listenWithoutNameSetAnywhere(String ignored) {
latchNameNotSet.countDown();
return Mono.empty();
}

@ReactivePulsarListener(topics = "rpl-typeSetOnAnnotation-topic",
subscriptionName = "rpl-typeSetOnAnnotation-sub", subscriptionType = SubscriptionType.Key_Shared,
@ReactivePulsarListener(topics = "rpl-nameSetOnAnnotation-topic", subscriptionName = "from-annotation",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenWithTypeSetOnAnnotation(String ignored) {
latchTypeSetOnAnnotation.countDown();
Mono<Void> listenWithNameSetOnAnnotation(String ignored) {
latchNameSetOnAnnotation.countDown();
return Mono.empty();
}

@ReactivePulsarListener(topics = "rpl-typeSetOnCustomizer-topic",
subscriptionName = "rpl-typeSetOnCustomizer-sub", subscriptionType = SubscriptionType.Key_Shared,
@ReactivePulsarListener(topics = "rpl-nameSetOnCustomizer-topic", subscriptionName = "from-annotation",
consumerCustomizer = "myCustomizer")
Mono<Void> listenWithTypeSetOnCustomizer(String ignored) {
latchTypeSetOnCustomizer.countDown();
Mono<Void> listenWithNameSetOnCustomizer(String ignored) {
latchNameSetOnCustomizer.countDown();
return Mono.empty();
}

@Bean
public ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> cb.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Failover);
.subscriptionName("from-customizer");
}

}
Expand Down
Loading

0 comments on commit 136d465

Please sign in to comment.