Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allows to initialize/inject the Topology object without starting the Kafka streams. #3020

Closed
foal opened this issue Feb 8, 2024 · 10 comments

Comments

@foal
Copy link

foal commented Feb 8, 2024

Expected Behavior

Get the Topology from StreamsBuilderFactoryBean without starting the streams engine. Topology have to be created with properties and KafkaStreamsInfrastructureCustomizer and all futures updated if any.

It is necessary to run the tests with Kafka TopologyTestDriver.

Current Behavior

I can inject the StreamsBuilder, but I then I need to create and initialize the topology manually.

Context

Manual work around:

    @Autowired
    private StreamsBuilder streamBuilder;
    @Autowired
    private KafkaStreamsConfiguration streamConfig;
    @Autowired
    private KafkaStreamsInfrastructureCustomizer infrastructureCustomizer;

///....

        Topology topology = streamBuilder.build(streamConfig.asProperties());
        infrastructureCustomizer.configureTopology(topology);

But of course, it aligned with current version of StreamsBuilderFactoryBean only.

@sobychacko
Copy link
Contributor

That is an interesting request. Could you elaborate on the use case that requires it?

@artembilan
Copy link
Member

I wonder why do you need StreamsBuilderFactoryBean if you create it manually.
There is also a PR like this #1853 which does something with TopologyTestDriver.

The StreamsBuilderFactoryBean can also be configured for the setAutoStartup(false).

So, we indeed need more info how to proceed.
Perhaps the fully formed test would be great start to look into.

Thanks

@foal
Copy link
Author

foal commented Feb 8, 2024

I do not create StreamsBuilderFactoryBean manually. In the real application all configured automatically.
The issue is in the tests. I have tree options:

  • Use Spring Boot test using Testcontainers - the Kafka Streams run exactly in the same way how it does in the application but tests take much more times, resources (the application works with 2 Kafka clusters) and very difficult and unstable due to asynchronous nature of the Kafka.
  • Use the plain JUnit test with Kafka TopologyTestDriver. Much faster and more predictable but I need to duplicate the configuration, topology setup (a have it as a set of beans too) etc. to run it. Also, it doesn't test the configuration from properties files and other things that handled in the real application by Spring.
  • Try to use the Spring Boot test but instead start the Kafka Streams use the TopologyTestDriver - it is my case. The reasonable balance between 100% identity with real application from the 1st case and lightweight and simplifying of testing from the 2nd case.

The test in general looks like (simplified):

@TestPropertySource(properties = {
    "spring.kafka.bootstrap-servers=mock://appTopologyTest",
    "spring.kafka.streams.application-id=app-test-${random.uuid}",
    "spring.kafka.properties.schema.registry.url=mock://appTopologyTest",
    "spring.kafka.streams.properties.schema.registry.url=mock://appTopologyTest",
    "app.kafka.topic-offer-replicas=1",
    "spring.kafka.streams.auto-startup=false",
    "spring.kafka.listener.auto-startup=false",
    "logging.level.org.springframework.kafka.core.KafkaAdmin=OFF",
    "logging.level.io.confluent.kafka.serializers=WARN",
    "logging.level.org.apache.kafka.clients.admin=WARN",
})
@SpringBootTest
public class AppTopologyTest {

    //There is configuration for input/output topics

    @Autowired
    private StreamsBuilder streamsBuilder;
    @Autowired
    private KafkaStreamsConfiguration kafkaStreamsConfiguration;
    @Autowired
    private KafkaStreamsInfrastructureCustomizer infrastructureCustomizer;

    @ParameterizedTest(name = "{0}")
    @MethodSource
    protected void testProcessing(String info, List<InputNew<?, ?>> input, Matcher<Iterable<TestRecord>> expected, TopicType out) {

        //This is a tricky part I need to duplicate logic from 
        //https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java#L347
        Topology topology = streamsBuilder.build(kafkaStreamsConfiguration.asProperties());
        infrastructureCustomizer.configureTopology(topology);

        try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, kafkaStreamsConfiguration.asProperties())) {

            Map<TopicType, TestInputTopic<?, ?>> inputTopics = new HashMap(10);

            input.forEach(i -> send(testDriver, inputTopics, i));

            TestOutputTopic outputTopic = getOutTopic(testDriver, out);

            List<TestRecord> output = outputTopic.readRecordsToList();

            MatcherAssert.assertThat(output, expected);
        }

    }

    @SuppressWarnings("unused")
    private static List<Arguments> testProcessing() {
        int i = 1;
        return List.of(
            testSteps(i++, "Duplicates", OUT, MATCHER_1, IN_MSG_1, IN_MSG_1),
            testSteps(i++, "Single ent", OUT, MATCHER_2, IN_MSG_3),
            testSteps(i++, "Single another ent", OUT, MATCHER_3, IN_MSG_4),
            testSteps(i++, "Comb", OUT,  MATCHER_4, IN_MSG_1, IN_MSG_2, IN_MSG_3, IN_MSG_4),
            //....
            testSteps(i++, "Nothing", OUT, MATCHER_NOTHING));
    }

    private <K, V> void send(TopologyTestDriver testDriver, Map<TopicType, TestInputTopic<?, ?>> inputTopics, InputNew<?, ?> input) {
        TestInputTopic<K, V> topic = getInputTopic(testDriver, inputTopics, input.getTopicType());
        topic.pipeInput(new TestRecord(input.getKey(), input.getValue()));
    }

    @SafeVarargs
    protected static Arguments testSteps(int iteration, String description, TopicType out,
                                         MatcherFn matcher, Function<Integer, InputNew>... inputs) {
        List<InputNew> parameters = StreamEx.of(inputs).map(input -> input.apply(iteration)).toList();
        return Arguments.of(description, parameters, matcher.apply(iteration), out);
    }
}

@artembilan
Copy link
Member

OK.
So, the idea is to have this logic in the StreamsBuilderFactoryBean:

Topology topol = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
this.infrastructureCustomizer.configureTopology(topol);
this.topology = topol;

out of the start() scope, e.g. in the afterPropertiesSet()?
And then have StreamsBuilderFactoryBean not started automatically and therefore don't have a KafkaStreams instances created over there.

If this is a plan, we will be happy to accept such a contribution.

Thanks

@foal
Copy link
Author

foal commented Feb 13, 2024

Ok. Try to provide PR next week.

@foal
Copy link
Author

foal commented Mar 1, 2024

Grr... Can't load the project into Eclipse due to eclipse-jdt/eclipse.jdt.core#1621

Will be fixed in the next release of Eclipse (4.31) - not sure....

@sobychacko sobychacko added this to the 3.2.0-RC1 milestone Mar 19, 2024
@chickenchickenlove
Copy link
Contributor

@artembilan, @sobychacko, @foal
Hi, Spring Kafka Team and reporter.
I'm interested in this issue as well.

If this is on pausing, may i handle this issue?
i'm willing to do this work after reporter and maintainer's ok answer.

@foal
Copy link
Author

foal commented Mar 30, 2024

Yes, please!

@chickenchickenlove
Copy link
Contributor

@foal Thanks a lot 🙇‍♂️
I will make a PR for this. 👍

@chickenchickenlove
Copy link
Contributor

#3172
I create new PR.
Please take a look 🙇‍♂️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants