From 193dbb76ca6cb79751cee079d16cb82133f9f5b3 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sat, 30 Mar 2024 17:42:48 +0900 Subject: [PATCH 1/5] Create topology before kafka streams start. --- .../kafka/config/StreamsBuilderFactoryBean.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java index 34a22d9727..f94f4a8d29 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -337,6 +337,11 @@ protected StreamsBuilder createInstance() { @Override public boolean isAutoStartup() { + try { + this.topology = getObject().build(this.properties); + } catch (Exception e) { + throw new RuntimeException(e); + } return this.autoStartup; } @@ -356,11 +361,9 @@ public void start() { try { Assert.state(this.properties != null, "streams configuration properties must not be null"); - Topology topol = getObject().build(this.properties); // NOSONAR: getObject() cannot return null - this.infrastructureCustomizer.configureTopology(topol); - this.topology = topol; - LOGGER.debug(() -> topol.describe().toString()); - this.kafkaStreams = new KafkaStreams(topol, this.properties, this.clientSupplier); + this.infrastructureCustomizer.configureTopology(this.topology); + LOGGER.debug(() -> this.topology.describe().toString()); + this.kafkaStreams = new KafkaStreams(this.topology, this.properties, this.clientSupplier); this.kafkaStreams.setStateListener(this.stateListener); this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener); if (this.streamsUncaughtExceptionHandler != null) { From e65e6ff93c8494786e29535dbc20680b43531be7 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Tue, 2 Apr 2024 00:51:28 +0900 Subject: [PATCH 2/5] Fix failed tests and fix check style --- .../config/StreamsBuilderFactoryBean.java | 3 +- .../StreamsBuilderFactoryBeanTests.java | 42 +++++++++++++++++-- .../StreamsBuilderFactoryLateConfigTests.java | 6 ++- 3 files changed, 46 insertions(+), 5 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java index f94f4a8d29..d224880b84 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java @@ -339,7 +339,8 @@ protected StreamsBuilder createInstance() { public boolean isAutoStartup() { try { this.topology = getObject().build(this.properties); - } catch (Exception e) { + } + catch (Exception e) { throw new RuntimeException(e); } return this.autoStartup; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java index 9da17583e5..f92af02b69 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -92,20 +92,56 @@ public void testCleanupStreams() throws IOException { } @Test - public void testBuildWithProperties() throws Exception { + public void testBuildWithPropertiesAndAutoStartUp() throws Exception { + boolean autoStartUp = true; streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) { @Override protected StreamsBuilder createInstance() { return spy(super.createInstance()); } }; + streamsBuilderFactoryBean.setAutoStartup(autoStartUp); streamsBuilderFactoryBean.afterPropertiesSet(); StreamsBuilder builder = streamsBuilderFactoryBean.getObject(); builder.stream(Pattern.compile("foo")); - streamsBuilderFactoryBean.start(); + + + boolean isAutoStartUp = streamsBuilderFactoryBean.isAutoStartup(); + if (isAutoStartUp) { + streamsBuilderFactoryBean.start(); + } + + StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject(); + verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties()); + assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull(); + assertThat(isAutoStartUp).isTrue(); + assertThat(streamsBuilderFactoryBean.isRunning()).isTrue(); + } + + @Test + public void testBuildWithPropertiesAndNoAutoStartUp() throws Exception { + boolean autoStartUp = false; + streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) { + @Override + protected StreamsBuilder createInstance() { + return spy(super.createInstance()); + } + }; + streamsBuilderFactoryBean.setAutoStartup(autoStartUp); + streamsBuilderFactoryBean.afterPropertiesSet(); + StreamsBuilder builder = streamsBuilderFactoryBean.getObject(); + builder.stream(Pattern.compile("foo")); + + boolean isAutoStartUp = streamsBuilderFactoryBean.isAutoStartup(); + if (isAutoStartUp) { + streamsBuilderFactoryBean.start(); + } + StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject(); verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties()); assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull(); + assertThat(isAutoStartUp).isFalse(); + assertThat(streamsBuilderFactoryBean.isRunning()).isFalse(); } @Configuration diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java index 9abc3f3321..d568cbe050 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,15 +72,19 @@ public void testStreamBuilderFactoryCannotBeInstantiatedWhenAutoStart() { @Test public void testStreamsBuilderFactoryWithConfigProvidedLater() throws Exception { + boolean isAutoStartUp = true; Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); streamsBuilderFactoryBean.setStreamsConfiguration(props); + streamsBuilderFactoryBean.setAutoStartup(isAutoStartUp); streamsBuilderFactoryBean.getObject().stream(Pattern.compile("foo")); assertThat(streamsBuilderFactoryBean.isRunning()).isFalse(); + boolean shouldAutoStartUp = streamsBuilderFactoryBean.isAutoStartup(); streamsBuilderFactoryBean.start(); assertThat(streamsBuilderFactoryBean.isRunning()).isTrue(); + assertThat(shouldAutoStartUp).isEqualTo(isAutoStartUp); } @Configuration From 20ba75d300dd16b7786f901a9b35a88053c72ebd Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Tue, 2 Apr 2024 07:54:53 +0900 Subject: [PATCH 3/5] Add author --- .../springframework/kafka/config/StreamsBuilderFactoryBean.java | 1 + .../kafka/config/StreamsBuilderFactoryBeanTests.java | 1 + .../kafka/config/StreamsBuilderFactoryLateConfigTests.java | 1 + 3 files changed, 3 insertions(+) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java index d224880b84..90e032918c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java @@ -56,6 +56,7 @@ * @author Denis Washington * @author Gary Russell * @author Julien Wittouck + * @author Sanghyeok An * * @since 1.1.4 */ diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java index f92af02b69..3e2570751b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java @@ -56,6 +56,7 @@ * @author Gary Russell * @author Denis Washington * @author Soby Chacko + * @author Sanghyeok An */ @SpringJUnitConfig @DirtiesContext diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java index d568cbe050..ff26153b60 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java @@ -43,6 +43,7 @@ * @author Soby Chacko * @author Artem Bilan * @author Gary Russell + * @author Sanghyeok An */ @SpringJUnitConfig @DirtiesContext From 9fafa6704fa05712f0fd84ae479f5f308ce01d2b Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Wed, 3 Apr 2024 12:08:57 +0900 Subject: [PATCH 4/5] Use SmartInitializingSingleton to build topology before start. --- .../config/StreamsBuilderFactoryBean.java | 23 ++++++++------ .../StreamsBuilderFactoryBeanTests.java | 31 ++++++------------- .../StreamsBuilderFactoryLateConfigTests.java | 21 +++++++++++-- 3 files changed, 41 insertions(+), 34 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java index 90e032918c..b1238cc99a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.springframework.beans.factory.BeanNameAware; +import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.config.AbstractFactoryBean; import org.springframework.context.SmartLifecycle; import org.springframework.core.log.LogAccessor; @@ -61,7 +62,7 @@ * @since 1.1.4 */ public class StreamsBuilderFactoryBean extends AbstractFactoryBean - implements SmartLifecycle, BeanNameAware { + implements SmartLifecycle, BeanNameAware, SmartInitializingSingleton { /** * The default {@link Duration} of {@code 10 seconds} for close timeout. @@ -338,12 +339,6 @@ protected StreamsBuilder createInstance() { @Override public boolean isAutoStartup() { - try { - this.topology = getObject().build(this.properties); - } - catch (Exception e) { - throw new RuntimeException(e); - } return this.autoStartup; } @@ -363,8 +358,6 @@ public void start() { try { Assert.state(this.properties != null, "streams configuration properties must not be null"); - this.infrastructureCustomizer.configureTopology(this.topology); - LOGGER.debug(() -> this.topology.describe().toString()); this.kafkaStreams = new KafkaStreams(this.topology, this.properties, this.clientSupplier); this.kafkaStreams.setStateListener(this.stateListener); this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener); @@ -437,6 +430,18 @@ public boolean isRunning() { } } + @Override + public void afterSingletonsInstantiated() { + try { + this.topology = getObject().build(this.properties); + this.infrastructureCustomizer.configureTopology(this.topology); + LOGGER.debug(() -> this.topology.describe().toString()); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + /** * Called whenever a {@link KafkaStreams} is added or removed. * diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java index 3e2570751b..5ef808961a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java @@ -93,58 +93,45 @@ public void testCleanupStreams() throws IOException { } @Test - public void testBuildWithPropertiesAndAutoStartUp() throws Exception { - boolean autoStartUp = true; + public void testBuildWithProperties() throws Exception { streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) { @Override protected StreamsBuilder createInstance() { return spy(super.createInstance()); } }; - streamsBuilderFactoryBean.setAutoStartup(autoStartUp); streamsBuilderFactoryBean.afterPropertiesSet(); StreamsBuilder builder = streamsBuilderFactoryBean.getObject(); builder.stream(Pattern.compile("foo")); - - - boolean isAutoStartUp = streamsBuilderFactoryBean.isAutoStartup(); - if (isAutoStartUp) { - streamsBuilderFactoryBean.start(); - } - + streamsBuilderFactoryBean.afterSingletonsInstantiated(); + streamsBuilderFactoryBean.start(); StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject(); verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties()); assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull(); - assertThat(isAutoStartUp).isTrue(); - assertThat(streamsBuilderFactoryBean.isRunning()).isTrue(); } @Test - public void testBuildWithPropertiesAndNoAutoStartUp() throws Exception { - boolean autoStartUp = false; + public void testGetTopologyBeforeKafkaStreamsStart() throws Exception { + // Given streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) { @Override protected StreamsBuilder createInstance() { return spy(super.createInstance()); } }; - streamsBuilderFactoryBean.setAutoStartup(autoStartUp); streamsBuilderFactoryBean.afterPropertiesSet(); StreamsBuilder builder = streamsBuilderFactoryBean.getObject(); builder.stream(Pattern.compile("foo")); - boolean isAutoStartUp = streamsBuilderFactoryBean.isAutoStartup(); - if (isAutoStartUp) { - streamsBuilderFactoryBean.start(); - } + // When + streamsBuilderFactoryBean.afterSingletonsInstantiated(); - StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject(); - verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties()); + // Then assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull(); - assertThat(isAutoStartUp).isFalse(); assertThat(streamsBuilderFactoryBean.isRunning()).isFalse(); } + @Configuration @EnableKafkaStreams public static class KafkaStreamsConfig { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java index ff26153b60..ced62aa24f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java @@ -23,6 +23,7 @@ import java.util.Properties; import java.util.regex.Pattern; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.junit.jupiter.api.Test; @@ -79,13 +80,10 @@ public void testStreamsBuilderFactoryWithConfigProvidedLater() throws Exception props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); streamsBuilderFactoryBean.setStreamsConfiguration(props); streamsBuilderFactoryBean.setAutoStartup(isAutoStartUp); - streamsBuilderFactoryBean.getObject().stream(Pattern.compile("foo")); assertThat(streamsBuilderFactoryBean.isRunning()).isFalse(); - boolean shouldAutoStartUp = streamsBuilderFactoryBean.isAutoStartup(); streamsBuilderFactoryBean.start(); assertThat(streamsBuilderFactoryBean.isRunning()).isTrue(); - assertThat(shouldAutoStartUp).isEqualTo(isAutoStartUp); } @Configuration @@ -100,6 +98,23 @@ public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() { return streamsBuilderFactoryBean; } + @Bean + public KafkaStreamsService kafkaStreamsService(StreamsBuilder streamsBuilder) { + return new KafkaStreamsService(streamsBuilder); + } + } + static class KafkaStreamsService { + private final StreamsBuilder streamsBuilder; + + public KafkaStreamsService(StreamsBuilder streamsBuilder) { + this.streamsBuilder = streamsBuilder; + buildPipeline(); + } + + public void buildPipeline() { + this.streamsBuilder.stream("foo"); + } + } } From 526392f167a7b66b21e5e9b03b630aff7864b33b Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Thu, 4 Apr 2024 07:02:22 +0900 Subject: [PATCH 5/5] fix lint error, and do not use 'foo' in the new code --- .../kafka/config/StreamsBuilderFactoryBeanTests.java | 3 +-- .../kafka/config/StreamsBuilderFactoryLateConfigTests.java | 7 +++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java index 5ef808961a..dd0f66efb1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java @@ -121,7 +121,7 @@ protected StreamsBuilder createInstance() { }; streamsBuilderFactoryBean.afterPropertiesSet(); StreamsBuilder builder = streamsBuilderFactoryBean.getObject(); - builder.stream(Pattern.compile("foo")); + builder.stream(Pattern.compile("test-topic")); // When streamsBuilderFactoryBean.afterSingletonsInstantiated(); @@ -131,7 +131,6 @@ protected StreamsBuilder createInstance() { assertThat(streamsBuilderFactoryBean.isRunning()).isFalse(); } - @Configuration @EnableKafkaStreams public static class KafkaStreamsConfig { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java index ced62aa24f..c412f155d9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java @@ -21,7 +21,6 @@ import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import java.util.Properties; -import java.util.regex.Pattern; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -108,13 +107,13 @@ public KafkaStreamsService kafkaStreamsService(StreamsBuilder streamsBuilder) { static class KafkaStreamsService { private final StreamsBuilder streamsBuilder; - public KafkaStreamsService(StreamsBuilder streamsBuilder) { + KafkaStreamsService(StreamsBuilder streamsBuilder) { this.streamsBuilder = streamsBuilder; buildPipeline(); } - public void buildPipeline() { - this.streamsBuilder.stream("foo"); + void buildPipeline() { + this.streamsBuilder.stream("test-topic"); } } }