-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create topology before kafka streams start. #3172
Changes from 2 commits
193dbb7
e65e6ff
20ba75d
9fafa67
526392f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2018-2020 the original author or authors. | ||
* Copyright 2018-2024 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -92,20 +92,56 @@ public void testCleanupStreams() throws IOException { | |
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add your name as an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aye, i added my name classes below.
|
||
|
||
@Test | ||
public void testBuildWithProperties() throws Exception { | ||
public void testBuildWithPropertiesAndAutoStartUp() throws Exception { | ||
boolean autoStartUp = true; | ||
streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) { | ||
@Override | ||
protected StreamsBuilder createInstance() { | ||
return spy(super.createInstance()); | ||
} | ||
}; | ||
streamsBuilderFactoryBean.setAutoStartup(autoStartUp); | ||
streamsBuilderFactoryBean.afterPropertiesSet(); | ||
StreamsBuilder builder = streamsBuilderFactoryBean.getObject(); | ||
builder.stream(Pattern.compile("foo")); | ||
streamsBuilderFactoryBean.start(); | ||
|
||
|
||
boolean isAutoStartUp = streamsBuilderFactoryBean.isAutoStartup(); | ||
if (isAutoStartUp) { | ||
streamsBuilderFactoryBean.start(); | ||
} | ||
|
||
StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject(); | ||
verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties()); | ||
assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull(); | ||
assertThat(isAutoStartUp).isTrue(); | ||
assertThat(streamsBuilderFactoryBean.isRunning()).isTrue(); | ||
} | ||
|
||
@Test | ||
public void testBuildWithPropertiesAndNoAutoStartUp() throws Exception { | ||
boolean autoStartUp = false; | ||
streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) { | ||
@Override | ||
protected StreamsBuilder createInstance() { | ||
return spy(super.createInstance()); | ||
} | ||
}; | ||
streamsBuilderFactoryBean.setAutoStartup(autoStartUp); | ||
streamsBuilderFactoryBean.afterPropertiesSet(); | ||
StreamsBuilder builder = streamsBuilderFactoryBean.getObject(); | ||
builder.stream(Pattern.compile("foo")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Try not to use names like |
||
|
||
boolean isAutoStartUp = streamsBuilderFactoryBean.isAutoStartup(); | ||
if (isAutoStartUp) { | ||
streamsBuilderFactoryBean.start(); | ||
} | ||
|
||
StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject(); | ||
verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties()); | ||
assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull(); | ||
assertThat(isAutoStartUp).isFalse(); | ||
assertThat(streamsBuilderFactoryBean.isRunning()).isFalse(); | ||
} | ||
|
||
@Configuration | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2018-2020 the original author or authors. | ||
* Copyright 2018-2024 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing |
||
|
@@ -72,15 +72,19 @@ public void testStreamBuilderFactoryCannotBeInstantiatedWhenAutoStart() { | |
|
||
@Test | ||
public void testStreamsBuilderFactoryWithConfigProvidedLater() throws Exception { | ||
boolean isAutoStartUp = true; | ||
Properties props = new Properties(); | ||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); | ||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); | ||
streamsBuilderFactoryBean.setStreamsConfiguration(props); | ||
streamsBuilderFactoryBean.setAutoStartup(isAutoStartUp); | ||
streamsBuilderFactoryBean.getObject().stream(Pattern.compile("foo")); | ||
|
||
assertThat(streamsBuilderFactoryBean.isRunning()).isFalse(); | ||
boolean shouldAutoStartUp = streamsBuilderFactoryBean.isAutoStartup(); | ||
streamsBuilderFactoryBean.start(); | ||
assertThat(streamsBuilderFactoryBean.isRunning()).isTrue(); | ||
assertThat(shouldAutoStartUp).isEqualTo(isAutoStartUp); | ||
} | ||
|
||
@Configuration | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to add this to the
afterPropertiesSet()
method instead of here. Doing it here adds more responsibilities to theisAutoStartup()
method, which should technically only return the currentautoStartup
status. Also, we need to call theconfigureTopology(topology)
method immediately after the topology creation so that some clients, such as theTopologyTestDriver
, may use a fully configured topology.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, please add your name as an
author
of this class.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this comment is not addressed? https://github.com/spring-projects/spring-kafka/pull/3172/files#r1546662939
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sobychacko, I'm so sorry, i missed this comments.
IMHO,
afterPropertiesSet()
is not proper workaround.streamBuilder#stream()
seems to be called afterStreamsBuilderFactoryBean#afterPropertiesSet()
are executed.When
StreamsBuilderFactoryBean#afterPropertiesSet()
is called, no stream declarations are made in theStreamBuilder
yet. Therefore, executingStreamsBuilder.build()
at the time of the afterPropertiesSet() call does not create any topology.And, Unfortunately, it seems that there is no appropriate hooking point. (spring-projects/spring-framework#32554)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And i agree that
isAutoStartup()
has much responsibilities in this case.If we move
configureTopology(topology)
toisAutoStartup()
, much responsibilities as well.However, as i said above, we cannot execute
streamsBuilder#build()
properly in scope ofafterPropertiesSet()
.Please refer to image above.
This is timeline of
KafkaStreams
withspring-kafka
.As you can see, In
StreamsBuilderFactoryBean#afterPropertiesSet()
,streamsBuilder
is just about to be created andstreamsBuilder
does not have any stream DSL.Thus
streamsBuilder#build()
is executed inStreamsBuilderFactoryBean#afterPropertiesSet()
, there is no DSL to build onstreamsBuilder
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just commented on your issue in Spring Framework.
Let's see if
SmartInitializingSingleton.afterSingletonsInstantiated()
implementation help us somehow!There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@artembilan Hi!
Thanks your comment.
SmartInitializingSingleton.afterSingletonsInstantiated()
works well.I modified code to use
SmartInitializingSingleton.afterSingletonsInstantiated()
and test code as well.Please Take a look. 🙇♂️
CC. @sobychacko