Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add possibility to use Virtual Threads in MessageListenerContainerFactory #924

Open
lazystone opened this issue Oct 27, 2023 · 5 comments
Open
Labels
component: sqs SQS integration related issue type: enhancement Smaller enhancement in existing integration

Comments

@lazystone
Copy link

I tried to configure SqsMessageListenerContainerFactory to use VirtualThreadTaskExecutor:

    @Bean
    SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
        return SqsMessageListenerContainerFactory.builder()
                .configure(options -> options.componentsTaskExecutor(
                                new VirtualThreadTaskExecutor("sqs-message-listener-container-components-"))
                        .acknowledgementResultTaskExecutor(new VirtualThreadTaskExecutor(
                                "sqs-message-listener-container-acknowledgement-result-")))
                .sqsAsyncClient(sqsAsyncClient)
                .build();
    }

But this does not work:

Caused by: java.util.concurrent.CompletionException: io.awspring.cloud.sqs.UnsupportedThreadFactoryException: Custom TaskExecutors must use a MessageExecutionThreadFactory.
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) ~[?:?]
	at java.base/java.lang.VirtualThread.run(VirtualThread.java:311) ~[?:?]
Caused by: io.awspring.cloud.sqs.UnsupportedThreadFactoryException: Custom TaskExecutors must use a MessageExecutionThreadFactory.
	at io.awspring.cloud.sqs.listener.AbstractPipelineMessageListenerContainer.verifyThreadType(AbstractPipelineMessageListenerContainer.java:146) ~[spring-cloud-aws-sqs-3.0.2.jar:3.0.2]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]

Would it be possible to relax this somehow, that VirtualThreadTaskExecutor can be used?

@maciejwalkowiak maciejwalkowiak added component: sqs SQS integration related issue type: enhancement Smaller enhancement in existing integration labels Oct 27, 2023
@maciejwalkowiak
Copy link
Contributor

Sounds like something we should do.

@tomazfernandes
Copy link
Contributor

Well, this restriction is there for a reason :)

I'll need to give some thought on how to handle this properly.

@tomazfernandes
Copy link
Contributor

Hi everyone. Just to share some of the issues here.

The Thread Hopping Phenomenon

The threading model this solution uses is - when we receive a batch of messages, we'll send them through the MessageSink component and use the provided or default TaskExecutor to create threads that will process the messages.

The problem is - if at any point in message execution such as interceptor, the message listener itself, or error handling returns a CompletableFuture and the thread has been changed, we'll be using the returned thread instead of the original one.

To give a simple example - one might have non-blocking Interceptor that will use an async client from AWS SDK to e.g. fetch something from S3 and add to the message, and returns a completable future from the interceptor.

The execution will continue on the AsyncS3Client thread, and if the listener is a blocking listener, it will quickly starve the S3Client's threads leading to severely decreased throughput and irresponsive client.

So the way I got around this was creating our own ThreadFactory - if the component execution returns on a different thread, we hop back into one of ours. This guarantees the user won't run into this problem which would be likely really difficult for the user to trace back.

Enter Virtual Threads

I'm confident we can do something similar with virtual threads - we should be able to simply check when we return that we are in a Virtual Thread, and if we're not, we can hop back into one.

I haven't had a chance to look into how other Spring projects managed to include Virtual Thread support without changing the Java baseline from Java 17, so we can simply check the thread type. We could of course do a workaround checking the class name or something similar, but hopefully they have something better than this.

Conclusion

So there you have it - if anyone knows how other Spring projects support works for Virtual Threads, I'd be happy to collaborate to add this to the project.

Thanks.

@tomazfernandes
Copy link
Contributor

Thanks for the info @jhinch-at-atlassian-com!

Seems like we'd need this multi-target approach, I wonder if this is possible with Maven as well?

If we can use a similar approach, we might be able to lift the MessageExecutionThreadFactory restriction and check if the thread is Virtual instead, to hop back on it in case the thread returned from SqsAsyncClient is not virtual.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue type: enhancement Smaller enhancement in existing integration
Projects
None yet
Development

No branches or pull requests

4 participants