Skip to content

Commit

Permalink
Add IT for config props driven listener (#822)
Browse files Browse the repository at this point in the history
This commit adds an integration test to verify the following Spring Boot
config props can be used to configure `@PulsarListener` and
`@ReactivePulsarListener`:

- `spring.pulsar.consumer.topic`
- `spring.pulsar.consumer.subscription.name`
- `spring.pulsar.consumer.subscription.type`

See spring-projects/spring-boot#42053
  • Loading branch information
onobc authored Sep 6, 2024
1 parent a518e11 commit 0012125
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.schema.SchemaType;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import org.springframework.boot.SpringApplication;
Expand Down Expand Up @@ -62,12 +67,11 @@ class PulsarListenerIntegrationTests implements PulsarTestContainerSupport {
void basicPulsarListener() throws Exception {
SpringApplication app = new SpringApplication(BasicListenerConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);

try (ConfigurableApplicationContext context = app
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
@SuppressWarnings("unchecked")
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
pulsarTemplate.send("plt-basic-topic", "John Doe");
pulsarTemplate.send("plit-basic-topic", "John Doe");
assertThat(LATCH_1.await(20, TimeUnit.SECONDS)).isTrue();
}
}
Expand All @@ -76,12 +80,11 @@ void basicPulsarListener() throws Exception {
void basicPulsarListenerCustomType() throws Exception {
SpringApplication app = new SpringApplication(BasicListenerCustomTypeConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);

try (ConfigurableApplicationContext context = app
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
@SuppressWarnings("unchecked")
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
pulsarTemplate.send("plt-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class));
pulsarTemplate.send("plit-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class));
assertThat(LATCH_2.await(20, TimeUnit.SECONDS)).isTrue();
}
}
Expand All @@ -90,12 +93,11 @@ void basicPulsarListenerCustomType() throws Exception {
void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception {
SpringApplication app = new SpringApplication(BasicListenerCustomTypeWithTypeMappingConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);

try (ConfigurableApplicationContext context = app
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
@SuppressWarnings("unchecked")
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
pulsarTemplate.send("plt-foo-topic2", new Foo("John Doe"));
pulsarTemplate.send("plit-foo-topic2", new Foo("John Doe"));
assertThat(LATCH_3.await(20, TimeUnit.SECONDS)).isTrue();
}
}
Expand All @@ -104,12 +106,11 @@ void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception {
void basicPulsarListenerWithTopicMapping() throws Exception {
SpringApplication app = new SpringApplication(BasicListenerWithTopicMappingConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);

try (ConfigurableApplicationContext context = app
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
@SuppressWarnings("unchecked")
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
pulsarTemplate.send("plt-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class));
pulsarTemplate.send("plit-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class));
assertThat(LATCH_4.await(20, TimeUnit.SECONDS)).isTrue();
}
}
Expand All @@ -118,23 +119,63 @@ void basicPulsarListenerWithTopicMapping() throws Exception {
void batchPulsarListener() throws Exception {
SpringApplication app = new SpringApplication(BatchListenerConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);

try (ConfigurableApplicationContext context = app
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
@SuppressWarnings("unchecked")
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
for (int i = 0; i < 10; i++) {
pulsarTemplate.send("plt-batch-topic", "John Doe");
pulsarTemplate.send("plit-batch-topic", "John Doe");
}
assertThat(LATCH_5.await(10, TimeUnit.SECONDS)).isTrue();
}
}

@Nested
class ConfigPropsDrivenListener {

private static final CountDownLatch LATCH_CONFIG_PROPS = new CountDownLatch(1);

@Test
void subscriptionConfigPropsAreRespectedOnListener() throws Exception {
SpringApplication app = new SpringApplication(ConfigPropsDrivenListenerConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
"--my.env=dev", "--spring.pulsar.consumer.topics=plit-config-props-topic-${my.env}",
"--spring.pulsar.consumer.subscription.type=Shared",
"--spring.pulsar.consumer.subscription.name=plit-config-props-subs-${my.env}")) {
@SuppressWarnings("unchecked")
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
pulsarTemplate.send("plit-config-props-topic-dev", "hello config props driven");
assertThat(LATCH_CONFIG_PROPS.await(10, TimeUnit.SECONDS)).isTrue();
}

}

@EnableAutoConfiguration
@SpringBootConfiguration
static class ConfigPropsDrivenListenerConfig {

@PulsarListener
public void listen(String ignored, Consumer<String> consumer) {
assertThat(consumer).extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class))
.satisfies((conf) -> {
assertThat(conf.getSingleTopic()).isEqualTo("plit-config-props-topic-dev");
assertThat(conf.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
assertThat(conf.getSubscriptionName()).isEqualTo("plit-config-props-subs-dev");
});
LATCH_CONFIG_PROPS.countDown();
}

}

}

@EnableAutoConfiguration
@SpringBootConfiguration
static class BasicListenerConfig {

@PulsarListener(subscriptionName = "plt-basic-sub", topics = "plt-basic-topic")
@PulsarListener(subscriptionName = "plit-basic-sub", topics = "plit-basic-topic")
public void listen(String ignored) {
LATCH_1.countDown();
}
Expand All @@ -145,7 +186,7 @@ public void listen(String ignored) {
@SpringBootConfiguration
static class BasicListenerCustomTypeConfig {

@PulsarListener(subscriptionName = "plt-foo-sub1", topics = "plt-foo-topic1", schemaType = SchemaType.JSON)
@PulsarListener(subscriptionName = "plit-foo-sub1", topics = "plit-foo-topic1", schemaType = SchemaType.JSON)
public void listen(Foo ignored) {
LATCH_2.countDown();
}
Expand All @@ -163,7 +204,7 @@ SchemaResolver customSchemaResolver() {
return resolver;
}

@PulsarListener(subscriptionName = "plt-foo-sub2", topics = "plt-foo-topic2")
@PulsarListener(subscriptionName = "plit-foo-sub2", topics = "plit-foo-topic2")
public void listen(Foo ignored) {
LATCH_3.countDown();
}
Expand All @@ -177,11 +218,11 @@ static class BasicListenerWithTopicMappingConfig {
@Bean
TopicResolver customTopicResolver() {
DefaultTopicResolver resolver = new DefaultTopicResolver();
resolver.addCustomTopicMapping(Foo.class, "plt-topicMapping-topic");
resolver.addCustomTopicMapping(Foo.class, "plit-topicMapping-topic");
return resolver;
}

@PulsarListener(subscriptionName = "plt-topicMapping-sub", schemaType = SchemaType.JSON)
@PulsarListener(subscriptionName = "plit-topicMapping-sub", schemaType = SchemaType.JSON)
public void listen(Foo ignored) {
LATCH_4.countDown();
}
Expand All @@ -192,7 +233,7 @@ public void listen(Foo ignored) {
@SpringBootConfiguration
static class BatchListenerConfig {

@PulsarListener(subscriptionName = "plt-batch-sub", topics = "plt-batch-topic", batch = true)
@PulsarListener(subscriptionName = "plit-batch-sub", topics = "plit-batch-topic", batch = true)
public void listen(List<String> foo) {
foo.forEach(t -> LATCH_5.countDown());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.reactive.client.api.MessageResult;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.WebApplicationType;
Expand All @@ -43,8 +52,12 @@
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ObjectUtils;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -134,6 +147,75 @@ void fluxListener() throws Exception {
}
}

@Nested
class ConfigPropsDrivenListener {

private static final CountDownLatch LATCH_CONFIG_PROPS = new CountDownLatch(1);

@Test
void subscriptionConfigPropsAreRespectedOnListener() throws Exception {
SpringApplication app = new SpringApplication(ConfigPropsDrivenListenerConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
"--my.env=dev", "--spring.pulsar.consumer.topics=rplit-config-props-topic-${my.env}",
"--spring.pulsar.consumer.subscription.type=Shared",
"--spring.pulsar.consumer.subscription.name=rplit-config-props-subs-${my.env}")) {
var topic = "persistent://public/default/rplit-config-props-topic-dev";
@SuppressWarnings("unchecked")
ReactivePulsarTemplate<String> pulsarTemplate = context.getBean(ReactivePulsarTemplate.class);
pulsarTemplate.send(topic, "hello config props driven").block();
assertThat(LATCH_CONFIG_PROPS.await(10, TimeUnit.SECONDS)).isTrue();
@SuppressWarnings("unchecked")
ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory = (ConsumerTrackingReactivePulsarConsumerFactory<String>) context
.getBean(ReactivePulsarConsumerFactory.class);
assertThat(consumerFactory.getSpec(topic)).isNotNull().satisfies((consumerSpec) -> {
assertThat(consumerSpec.getTopicNames()).containsExactly(topic);
assertThat(consumerSpec.getSubscriptionName()).isEqualTo("rplit-config-props-subs-dev");
assertThat(consumerSpec.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
});
}
}

@EnableAutoConfiguration
@SpringBootConfiguration
@Import(ConsumerCustomizerConfig.class)
static class ConfigPropsDrivenListenerConfig {

/**
* Post process the Reactive consumer factory and replace it with a tracking
* wrapper around it. Because this test requires the Spring Boot config props
* to be applied to the auto-configured consumer factory we can't simply
* replace the consumer factory bean as the config props will not be set on
* the custom consumer factory.
* @return post processor to wrap a tracker around the reactive consumer
* factory
*/
@Bean
static BeanPostProcessor consumerTrackingConsumerFactory() {
return new BeanPostProcessor() {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof ReactivePulsarConsumerFactory rcf) {
return new ConsumerTrackingReactivePulsarConsumerFactory<>(
(ReactivePulsarConsumerFactory<String>) rcf);
}
return bean;
}
};
}

@ReactivePulsarListener(consumerCustomizer = "consumerCustomizer")
public Mono<Void> listen(String ignored) {
LATCH_CONFIG_PROPS.countDown();
return Mono.empty();
}

}

}

@EnableAutoConfiguration
@SpringBootConfiguration
@Import(ConsumerCustomizerConfig.class)
Expand Down Expand Up @@ -230,4 +312,42 @@ ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> consumerCustomize
record Foo(String value) {
}

static class ConsumerTrackingReactivePulsarConsumerFactory<T> implements ReactivePulsarConsumerFactory<T> {

private Map<String, ReactiveMessageConsumerSpec> topicNameToConsumerSpec = new HashMap<>();

private ReactivePulsarConsumerFactory<T> delegate;

ConsumerTrackingReactivePulsarConsumerFactory(ReactivePulsarConsumerFactory<T> delegate) {
this.delegate = delegate;
}

@Override
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema) {
var consumer = this.delegate.createConsumer(schema);
storeSpec(consumer);
return consumer;
}

@Override
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
List<ReactiveMessageConsumerBuilderCustomizer<T>> reactiveMessageConsumerBuilderCustomizers) {
var consumer = this.delegate.createConsumer(schema, reactiveMessageConsumerBuilderCustomizers);
storeSpec(consumer);
return consumer;
}

private void storeSpec(ReactiveMessageConsumer<T> consumer) {
var consumerSpec = (ReactiveMessageConsumerSpec) ReflectionTestUtils.getField(consumer, "consumerSpec");
var topicNamesKey = !ObjectUtils.isEmpty(consumerSpec.getTopicNames()) ? consumerSpec.getTopicNames().get(0)
: "no-topics-set";
this.topicNameToConsumerSpec.put(topicNamesKey, consumerSpec);
}

ReactiveMessageConsumerSpec getSpec(String topic) {
return this.topicNameToConsumerSpec.get(topic);
}

}

}

0 comments on commit 0012125

Please sign in to comment.