Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce PollerMetadataCustomizer for customizing Spring Integration's PollerMetadata #44534

Closed
quaff opened this issue Mar 5, 2025 · 12 comments · May be fixed by #44637
Closed

Introduce PollerMetadataCustomizer for customizing Spring Integration's PollerMetadata #44534

quaff opened this issue Mar 5, 2025 · 12 comments · May be fixed by #44637
Labels
status: superseded An issue that has been superseded by another

Comments

@quaff
Copy link
Contributor

quaff commented Mar 5, 2025

I created a simple Spring Boot demo project for testing remote partitioning with spring-integration-kafka, I found that partition steps are executed sequentially:

2025-03-05T13:10:26.039+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition9] executed in 10s504ms
2025-03-05T13:10:36.561+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition4] executed in 10s504ms
2025-03-05T13:10:47.004+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition5] executed in 10s424ms
2025-03-05T13:10:57.413+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition6] executed in 10s392ms
2025-03-05T13:11:07.874+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition0] executed in 10s444ms
2025-03-05T13:11:18.320+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition1] executed in 10s427ms
2025-03-05T13:11:28.771+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition7] executed in 10s432ms
2025-03-05T13:11:39.225+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition2] executed in 10s438ms
2025-03-05T13:11:49.668+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition3] executed in 10s427ms
2025-03-05T13:12:00.120+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition8] executed in 10s436ms

I'd like they are executed concurrently to reduce total execute time, It doesn't work even I configured spring.task.scheduling.pool.size > 1, because taskScheduler in IntegrationAutoConfiguration.IntegrationTaskSchedulerConfiguration is not used by default poller.
I'm beginner of Spring Integration, not sure it's overlooked or intentional.

Here is the demo project: batch-demo.zip

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Mar 5, 2025
@wilkinsona
Copy link
Member

Can you advise here please, @artembilan?

@wilkinsona wilkinsona added the status: waiting-for-internal-feedback An issue that needs input from a member or another Spring Team label Mar 5, 2025
@artembilan
Copy link
Member

Thank you for the sample!
After fixing docker-compose.yml to container_name: kafka I was able to run it, reproduce and debug.
Without that change it fails like:

Stderr:
 Network batch-demo_default  Creating
 Network batch-demo_default  Created
 Container batch-demo-mysql-1  Creating
 Container broker  Creating
Error response from daemon: Conflict. The container name "/broker" is already in use by container "e83f9eb5592384c33b16daec01f3fd4d65e1a49cc79a7546469adf1150ed6cdf". You have to remove (or rename) that container to be able to reuse that name.

	at org.springframework.boot.docker.compose.core.ProcessRunner.run(ProcessRunner.java:97) ~[spring-boot-docker-compose-3.4.3.jar:3.4.3]
	at org.springframework.boot.docker.compose.core.DockerCli.run(DockerCli.java:81) ~[spring-boot-docker-compose-3.4.3.jar:3.4.3]
	at org.springframework.boot.docker.compose.core.DefaultDockerCompose.up(DefaultDockerCompose.java:56) ~[spring-boot-docker-compose-3.4.3.jar:3.4.3]
	at org.springframework.boot.docker.compose.lifecycle.StartCommand.applyTo(StartCommand.java:51) ~[spring-boot-docker-compose-3.4.3.jar:3.4.3]

Not sure what that means, but that is not related to the subject.

So, in debug I see that Spring Batch bean:

	@Bean
	Step workerStep(ItemReader<Integer> itemReader, ItemProcessor<Integer, Customer> itemProcessor, ItemWriter<Customer> itemWriter) {
		return stepBuilderFactory.get("workerStep")
				.inputChannel(inboundRequests())
				.<Integer, Customer>chunk(10, transactionManager)
				.reader(itemReader)
				.processor(itemProcessor)
				.writer(itemWriter)
				.build();
	}

Creates a PollingConsumer for that QueueChannel inboundRequests bean.
And this one really uses an auto-configured TaskScheduler with your provided spring.task.scheduling.pool.size: 8.
However according to our logs:

2025-03-05T13:10:26.039+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition9] executed in 10s504ms
2025-03-05T13:10:36.561+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition4] executed in 10s504ms
2025-03-05T13:10:47.004+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition5] executed in 10s424ms
2025-03-05T13:10:57.413+08:00  INFO 34621 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition6] executed in 10s392ms

This endpoint still uses only one thread from that TaskScheduler just because the logic is like:

public void setTrigger(Trigger trigger) {
	this.trigger = (trigger != null ? trigger : new PeriodicTrigger(Duration.ofMillis(DEFAULT_POLLING_PERIOD)));
}

Which means: start a new poll after one second when previous has been finished.

So, to summarize your concern regarding IntegrationAutoConfiguration.IntegrationTaskSchedulerConfiguration.
It works as designed even if it is not what is expected.

When I added this property:

logging.level.org.springframework.integration.endpoint.PollingConsumer: debug

and waited long enough for the application to run, I can see this:

2025-03-05T09:07:05.821-05:00 DEBUG 81792 --- [   scheduling-1] o.s.i.endpoint.PollingConsumer           : Poll resulted in Message: GenericMessage [payload=StepExecutionRequest: [jobExecutionId=5, stepExecutionId=53, stepName=workerStep], headers={sequenceNumber=4, kafka_offset=8, sequenceSize=10, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@3a918b76, kafka_timestampType=CREATE_TIME, correlationId=5:workerStep, kafka_receivedPartitionId=2, kafka_receivedTopic=worker, kafka_receivedTimestamp=1741183545947, kafka_groupId=spring-batch-demo}]
2025-03-05T09:07:17.306-05:00  INFO 81792 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition0] executed in 11s462ms
2025-03-05T09:07:17.339-05:00 DEBUG 81792 --- [   scheduling-1] o.s.i.endpoint.PollingConsumer           : Poll resulted in Message: GenericMessage [payload=StepExecutionRequest: [jobExecutionId=5, stepExecutionId=50, stepName=workerStep], headers={sequenceNumber=5, kafka_offset=9, sequenceSize=10, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@3a918b76, kafka_timestampType=CREATE_TIME, correlationId=5:workerStep, kafka_receivedPartitionId=2, kafka_receivedTopic=worker, kafka_receivedTimestamp=1741183545947, kafka_groupId=spring-batch-demo}]
2025-03-05T09:07:28.779-05:00  INFO 81792 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition8] executed in 11s427ms
2025-03-05T09:07:28.802-05:00 DEBUG 81792 --- [   scheduling-1] o.s.i.endpoint.PollingConsumer           : Poll resulted in Message: GenericMessage [payload=StepExecutionRequest: [jobExecutionId=5, stepExecutionId=49, stepName=workerStep], headers={sequenceNumber=7, kafka_offset=9, sequenceSize=10, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@3a918b76, kafka_timestampType=CREATE_TIME, correlationId=5:workerStep, kafka_receivedPartitionId=1, kafka_receivedTopic=worker, kafka_receivedTimestamp=1741183545948, kafka_groupId=spring-batch-demo}]
2025-03-05T09:07:40.273-05:00  INFO 81792 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition1] executed in 11s460ms
2025-03-05T09:07:41.316-05:00 DEBUG 81792 --- [   scheduling-1] o.s.i.endpoint.PollingConsumer           : Received no Message during the poll, returning 'false'
2025-03-05T09:07:43.333-05:00 DEBUG 81792 --- [   scheduling-1] o.s.i.endpoint.PollingConsumer           : Received no Message during the poll, returning 'false'
2025-03-05T09:07:45.352-05:00 DEBUG 81792 --- [   scheduling-2] o.s.i.endpoint.PollingConsumer           : Received no Message during the poll, returning 'false'

That means that auto-configured TaskScheduler and Poller work well as expected.

I'm not good with Spring Batch, but looks like the processing is blocked in one of those reader/processor/writer for those ~10s.

Now about TaskExecutor you have mentioned in the issue subject.
Yes, indeed there is such an option on the PollerMetadata and this one has nothing to do with the TaskScheduler and used for dispatching a real work to different threads, letting the scheduler one move to the next schedule.
However the TaskExecutor is not exposed anyhow for Spring Integration auto-configuration.
That is probably what we can use as a goal for this issue, @wilkinsona .

The workaround is like this for your Spring Batch application, @quaff :

	@Bean
	@BridgeTo("requestsChannel")
	QueueChannel inboundRequests() {
		return new QueueChannel();
	}

	@Bean
	ExecutorChannel requestsChannel(TaskExecutor taskExecutor) {
		return new ExecutorChannel(taskExecutor);
	}

	@Bean
	Step workerStep(ItemReader<Integer> itemReader, ItemProcessor<Integer, Customer> itemProcessor, ItemWriter<Customer> itemWriter) {
		return stepBuilderFactory.get("workerStep")
				.inputChannel(requestsChannel(null))
				.<Integer, Customer>chunk(10, transactionManager)
				.reader(itemReader)
				.processor(itemProcessor)
				.writer(itemWriter)
				.build();
	}

And logs are like this:

2025-03-05T09:38:45.392-05:00  INFO 83112 --- [         task-3] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition7] executed in 10s957ms
2025-03-05T09:38:45.401-05:00  INFO 83112 --- [         task-6] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition5] executed in 10s963ms
2025-03-05T09:38:45.408-05:00  INFO 83112 --- [         task-2] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition0] executed in 10s970ms
2025-03-05T09:38:45.420-05:00  INFO 83112 --- [         task-4] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition3] executed in 10s983ms
2025-03-05T09:38:45.427-05:00  INFO 83112 --- [         task-7] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition8] executed in 10s989ms
2025-03-05T09:38:45.435-05:00  INFO 83112 --- [         task-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition2] executed in 10s997ms
2025-03-05T09:38:45.435-05:00  INFO 83112 --- [         task-8] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition4] executed in 10s999ms
2025-03-05T09:38:45.435-05:00  INFO 83112 --- [         task-5] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition9] executed in 10s997ms

Sorry for a long coment: just wanted to clear things up!

@quaff
Copy link
Contributor Author

quaff commented Mar 6, 2025

Thank you for the sample! After fixing docker-compose.yml to container_name: kafka I was able to run it, reproduce and debug.

It may caused by existing container having same name, It works for me.

I'm not good with Spring Batch, but looks like the processing is blocked in one of those reader/processor/writer for those ~10s.

It's not related to Spring Batch, I added delay in processor for demonstration.

The workaround is like this for your Spring Batch application

It works like a charm, thank you very much! @artembilan

@quaff

This comment has been minimized.

@wilkinsona
Copy link
Member

Thanks very much, @artembilan. To confirm my understanding, are you proposing that we update the auto-configured PollerMetadata to use the context's TaskExecutor if it has one?

@artembilan
Copy link
Member

Forgot to mention.
There is a third dimension in the polling endpoint algorithm: maxMessagesPerPoll.
That mean ask source for data in a single polling cycle until it is null or this number.
So, this was another aspect which contributes to the same thread usage in your scenario, @quaff .
The PollingConsumer for that inboundRequests queue starts a polling cycle and loop until it reads all the messages.
And since every message processing is blocking downstream, that would make the whole polling cycle to be blocked long enough.
The taskExecutor option on the poller let us to emit those messages in a maxMessagesPerPoll loop into different threads.
Only the problem with ExecutorChannel and in-memory QueueChannel that messages might be lost in case of application crash.
This is a reason why we really don't recommend to do a lot of thread switching even if that looks like nice from concurrency and configuration perspective.

Having that in mind I looked into your application one more time.
Apparently you do Spring Batch partitioning like this:

	@Bean
	Step partitionerStep() {
		return stepBuilderFactory.get("partitionerStep")
				.partitioner("workerStep", partitioner())
				.gridSize(10)
				.outputChannel(outboundRequests())
				.build();
	}

So, in the end that is going to be 10 workers.
Having that I've changed your topic config to same value:

private static final int TOPIC_PARTITION_COUNT = 10;

So, if we are going to have 10 partitions in Kafka, it is OK to assume that we would like to have at least 10 workers on those Kafka partitions.
Therefore I made this change:

				.from(Kafka.messageDrivenChannelAdapter(cf, workerTopic.name())
						.configureListenerContainer((container) -> container.concurrency(10)))

That means start 10 parallel Kafka consumers, one for each partition in the topic.
And finally this change:

	@Bean
	DirectChannel inboundRequests() {
		return new DirectChannel();
	}

So, the message consumed from the Kafka topic is going to be processed in the same thread.
And that would ensure that data is not going to be lost. Especially the one which has not been consumed yet because this thread is blocked by the downstream process.
With all those changes I got this in logs:

2025-03-06T08:43:57.732-05:00  INFO 116956 --- [ntainer#0-6-C-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition7] executed in 11s591ms
2025-03-06T08:43:57.775-05:00  INFO 116956 --- [ntainer#0-1-C-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition0] executed in 11s628ms
2025-03-06T08:43:57.784-05:00  INFO 116956 --- [ntainer#0-2-C-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition4] executed in 11s637ms
2025-03-06T08:43:57.784-05:00  INFO 116956 --- [ntainer#0-7-C-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition1] executed in 11s637ms
2025-03-06T08:43:57.793-05:00  INFO 116956 --- [ntainer#0-3-C-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition3] executed in 11s645ms
2025-03-06T08:43:57.793-05:00  INFO 116956 --- [ntainer#0-5-C-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition8] executed in 11s649ms
2025-03-06T08:43:57.793-05:00  INFO 116956 --- [ntainer#0-8-C-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition5] executed in 11s649ms
2025-03-06T08:43:57.793-05:00  INFO 116956 --- [ntainer#0-0-C-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition6] executed in 11s650ms
2025-03-06T08:43:57.801-05:00  INFO 116956 --- [ntainer#0-4-C-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition9] executed in 11s654ms
2025-03-06T08:43:57.801-05:00  INFO 116956 --- [ntainer#0-9-C-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition2] executed in 11s652ms

So, all of them are really executed in parallel, and that is done in threads from Kafka listener container.
Al fine and robust.

Either way, I think having auto-configuration property for poller's executor still make sense.
So, @wilkinsona , I think something like spring.integration.poller.use-application-task-executor = ture should be good enough to let end-user opt-in for that applicationTaskExecutor bean injection into the IntegrationAutoConfiguration.IntegrationConfiguration.defaultPollerMetadata().
Doesn't look like there is any condition on the TaskExecutionAutoConfiguration which would make the applicationTaskExecutor bean not to be registered.

@artembilan
Copy link
Member

The LoggingHandler doesn't log the first message with sequenceNumber=0,

That is not correct, @quaff .
You've just missed it in logs before the message is sent to Kafka:

2025-03-06T10:55:11.674-05:00  INFO 81396 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [partitionerStep]
2025-03-06T10:55:12.228-05:00  INFO 81396 --- [           main] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=StepExecutionRequest: [jobExecutionId=14, stepExecutionId=147, stepName=workerStep], headers={sequenceNumber=0, correlationId=14:workerStep, id=4edcc3a6-a218-f935-1c3e-d7bfb9fad206, sequenceSize=10, timestamp=1741276512225}]
2025-03-06T10:55:12.278-05:00  INFO 81396 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 

The rest messages in that sequence are sent to Kafka for already initialized producer.
So, all good.

Let's concentrate in this issue for the mentioned auto-configuration property for Spring Integration PollerMetadata!

quaff added a commit to quaff/spring-boot that referenced this issue Mar 7, 2025
…pplication-task-executor"

Fix spring-projectsGH-44534

Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
quaff added a commit to quaff/spring-boot that referenced this issue Mar 7, 2025
…pplication-task-executor"

Fix spring-projectsGH-44534

Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
@wilkinsona
Copy link
Member

I'm not 100% sure about the proposed configuration property as it feels a bit unusual in a Spring Boot context. There are also precedents elsewhere in Spring for components picking up an executor based on bean name without Boot being involved.

Some existing cases:

  • The auto-configuration for GraphQL's AnnotatedControllerConfigurer always uses the application task executor if it exists in the context
  • When spring.data.jpa.repositories.bootstrap-mode is set to deferred or lazy, the context's AsyncTaskExecutor bean will be used. If there are multiple such beans, the one named applicationTaskExecutor is used
  • Framework's AsyncAnnotationBeanPostProcessor will use a unique TaskExecutor if available, falling back to an Executor named taskExecutor. One of the names of the auto-configured application task executor is taskExecutor
  • When virtual threads are enabled, WebFlux's blocked execution is auto-configured to use the application task executor if it's available
  • WebMvc's async support is auto configured to use the application task executor if it's available
  • WebSocket's inbound and outbound channels are auto-configured to use the context's AsyncTaskExecutor bean. If there are multiple such beans, the one named applicationTaskExecutor is used

Given these existing cases, I would prefer to avoid the proposed configuration property if we can. If customising the poller's executor is a common need, perhaps it could offer automatic support as Framework's @Async support does? Another option is for Boot to always configure it to use the application task executor when it's available as happens in many of the cases above.

@artembilan
Copy link
Member

Thank you for summarizing existing solutions, @wilkinsona !

Unfortunately this PollerMetadata.taskExecutor is a bit different use-case.
It is optional and end-user deliberately must chose to opt-in for this async work dispatching due to mentioned before concern of data loss.

Sure! I'm also not very happy with the configuration property proposal.
I think we can go more well-established pattern in Spring Boot and introduce a PollerMetadataCustomizer instead.
Similar to ThreadPoolTaskSchedulerCustomizer.

I see that we have more options there which cannot be covered by simple configuration properties:

	private ErrorHandler errorHandler;

	private List<Advice> adviceChain;

	private Executor taskExecutor;

Please, let us know if you agreed, @wilkinsona , and that @quaff should rework his PR respectively.

@quaff
Copy link
Contributor Author

quaff commented Mar 10, 2025

I prefer to PollerMetadataCustomizer, I would rework PR if @wilkinsona agreed this solution.

@wilkinsona
Copy link
Member

Thanks, @quaff. Yeah, I think the customizer's a good idea. As @artembilan points out, there are two other options that are also unsuited to configuration properties and a customiser would take care of all three.

@wilkinsona wilkinsona changed the title Autoconfigure taskExecutor for Spring Integration default poller Introduce PollerMetadataCustomizer for customizing Spring Integration's PollerMetadata Mar 10, 2025
@wilkinsona wilkinsona removed the status: waiting-for-internal-feedback An issue that needs input from a member or another Spring Team label Mar 10, 2025
quaff added a commit to quaff/spring-boot that referenced this issue Mar 10, 2025
Fix spring-projectsGH-44534

Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
quaff added a commit to quaff/spring-boot that referenced this issue Mar 10, 2025
Fix spring-projectsGH-44534

Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
@wilkinsona
Copy link
Member

Closing in favor of #44637. Thanks for the PR, @quaff.

@wilkinsona wilkinsona closed this as not planned Won't fix, can't repro, duplicate, stale Mar 10, 2025
@wilkinsona wilkinsona added status: superseded An issue that has been superseded by another and removed status: waiting-for-triage An issue we've not yet triaged labels Mar 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: superseded An issue that has been superseded by another
Projects
None yet
4 participants