diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java index 34a22d9727..b1238cc99a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.springframework.beans.factory.BeanNameAware; +import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.config.AbstractFactoryBean; import org.springframework.context.SmartLifecycle; import org.springframework.core.log.LogAccessor; @@ -56,11 +57,12 @@ * @author Denis Washington * @author Gary Russell * @author Julien Wittouck + * @author Sanghyeok An * * @since 1.1.4 */ public class StreamsBuilderFactoryBean extends AbstractFactoryBean - implements SmartLifecycle, BeanNameAware { + implements SmartLifecycle, BeanNameAware, SmartInitializingSingleton { /** * The default {@link Duration} of {@code 10 seconds} for close timeout. @@ -356,11 +358,7 @@ public void start() { try { Assert.state(this.properties != null, "streams configuration properties must not be null"); - Topology topol = getObject().build(this.properties); // NOSONAR: getObject() cannot return null - this.infrastructureCustomizer.configureTopology(topol); - this.topology = topol; - LOGGER.debug(() -> topol.describe().toString()); - this.kafkaStreams = new KafkaStreams(topol, this.properties, this.clientSupplier); + this.kafkaStreams = new KafkaStreams(this.topology, this.properties, this.clientSupplier); this.kafkaStreams.setStateListener(this.stateListener); this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener); if (this.streamsUncaughtExceptionHandler != null) { @@ -432,6 +430,18 @@ public boolean isRunning() { } } + @Override + public void afterSingletonsInstantiated() { + try { + this.topology = getObject().build(this.properties); + this.infrastructureCustomizer.configureTopology(this.topology); + LOGGER.debug(() -> this.topology.describe().toString()); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + /** * Called whenever a {@link KafkaStreams} is added or removed. * diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java index 9da17583e5..dd0f66efb1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,6 +56,7 @@ * @author Gary Russell * @author Denis Washington * @author Soby Chacko + * @author Sanghyeok An */ @SpringJUnitConfig @DirtiesContext @@ -102,12 +103,34 @@ protected StreamsBuilder createInstance() { streamsBuilderFactoryBean.afterPropertiesSet(); StreamsBuilder builder = streamsBuilderFactoryBean.getObject(); builder.stream(Pattern.compile("foo")); + streamsBuilderFactoryBean.afterSingletonsInstantiated(); streamsBuilderFactoryBean.start(); StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject(); verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties()); assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull(); } + @Test + public void testGetTopologyBeforeKafkaStreamsStart() throws Exception { + // Given + streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) { + @Override + protected StreamsBuilder createInstance() { + return spy(super.createInstance()); + } + }; + streamsBuilderFactoryBean.afterPropertiesSet(); + StreamsBuilder builder = streamsBuilderFactoryBean.getObject(); + builder.stream(Pattern.compile("test-topic")); + + // When + streamsBuilderFactoryBean.afterSingletonsInstantiated(); + + // Then + assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull(); + assertThat(streamsBuilderFactoryBean.isRunning()).isFalse(); + } + @Configuration @EnableKafkaStreams public static class KafkaStreamsConfig { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java index 9abc3f3321..c412f155d9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,8 @@ import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import java.util.Properties; -import java.util.regex.Pattern; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.junit.jupiter.api.Test; @@ -43,6 +43,7 @@ * @author Soby Chacko * @author Artem Bilan * @author Gary Russell + * @author Sanghyeok An */ @SpringJUnitConfig @DirtiesContext @@ -72,11 +73,12 @@ public void testStreamBuilderFactoryCannotBeInstantiatedWhenAutoStart() { @Test public void testStreamsBuilderFactoryWithConfigProvidedLater() throws Exception { + boolean isAutoStartUp = true; Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); streamsBuilderFactoryBean.setStreamsConfiguration(props); - streamsBuilderFactoryBean.getObject().stream(Pattern.compile("foo")); + streamsBuilderFactoryBean.setAutoStartup(isAutoStartUp); assertThat(streamsBuilderFactoryBean.isRunning()).isFalse(); streamsBuilderFactoryBean.start(); @@ -95,6 +97,23 @@ public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() { return streamsBuilderFactoryBean; } + @Bean + public KafkaStreamsService kafkaStreamsService(StreamsBuilder streamsBuilder) { + return new KafkaStreamsService(streamsBuilder); + } + } + static class KafkaStreamsService { + private final StreamsBuilder streamsBuilder; + + KafkaStreamsService(StreamsBuilder streamsBuilder) { + this.streamsBuilder = streamsBuilder; + buildPipeline(); + } + + void buildPipeline() { + this.streamsBuilder.stream("test-topic"); + } + } }