Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SQS] Kotlin Coroutines support for @SqsListener #871

Open
gabfssilva opened this issue Aug 23, 2023 · 9 comments
Open

[SQS] Kotlin Coroutines support for @SqsListener #871

gabfssilva opened this issue Aug 23, 2023 · 9 comments
Labels
component: sqs SQS integration related issue status: waiting-for-feedback Waiting for feedback from issuer type: enhancement Smaller enhancement in existing integration

Comments

@gabfssilva
Copy link

gabfssilva commented Aug 23, 2023

Type: Feature

Is your feature request related to a problem? Please describe.
No, It's not.

Describe the solution you'd like
In order to create a new listener, the method defined must return either void or CompletableFuture<Void>. While it works great for most scenarios, even Kotlin (once you wrap your coroutine into a future { } block), the SQS module could support other types of effects as well, such as the one this feature request proposes (coroutines), but also Mono / Flux:

@SqsListener("hello")
suspend fun hello() { ... }

@SqsListener("hello")
fun hello(): Mono<Unit> { ... }

Some Spring modules already are capable of interpreting the type of the annotated function to properly execute & extract the response:

// web flux & coroutines
@GetMapping
suspend fun hello() = "hello, world!"

// web flux & reactor
@GetMapping
fun hello() = Mono.just("hello, world!")

// declarative http clients & coroutines
@GetExchange("/hello")
suspend fun sendHello(): String

// declarative http clients & reactor
@GetExchange("/hello")
fun sendHello(): Mono<String>

... so, I believe it would make sense to introduce this capability to the SQS module. It's possible that other modules could benefit from this feature as well, although I'm not exactly sure which ones.

Describe alternatives you've considered
One obvious alternative is to simply return a completable future. While this works fine, it makes things more verbose:

val coroutineScope = CoroutineScope(Dispatchers.Default)

@SqsListener("hello")
fun onHello() = coroutineScope.future { // this function transforms a coroutine into a CompletableFuture
   ...
}

Additional context
I plan to examine the modules to better understand the viability of this feature.

@gabfssilva
Copy link
Author

fyi @tomazfernandes

@tomazfernandes
Copy link
Contributor

Hey @gabfssilva, nice to see you around!

The team made a decision a while back not to support Project Reactor, basically due to our limited time to maintain the project, and under the light of the upcoming Virtual Threads which should address some reactive use cases.

Also, as you mentioned, the SQS already plays nice with reactive due to the CompletableFuture structure, which seems good enough.

I know you're a reactive enthusiast and probably feels differently 😄 Bear with us in that this is already a big project to support without introducing a new programming paradigm to it.

All that being said, if you'd like to give a shot at some simple enough implementation without hard dependencies to the Reactor project, I'd be happy to take a look.

@tomazfernandes tomazfernandes added component: sqs SQS integration related issue status: waiting-for-feedback Waiting for feedback from issuer labels Sep 6, 2023
@gabfssilva
Copy link
Author

Thanks for the answer, Tomaz!

Yep, you know me, I'm quite an enthusiast. 😄 It's okay to not support Project Reactor, although I'm pretty sure the implementation would be quite straightforward. I may be completely wrong here, but for this case, I think it would be a simple conversion between CompletableFuture and Mono.

It may seem unnecessary since this mapping I'm talking about can be easily done by calling .toFuture from Mono, but there's a catch that's worth mentioning:

@SqsListener("queue")
public Mono<Unit> myNiceListener() {
    return saveToDB(...).map(somethingElse(...)).flatMap(anotherThing(...))
}

This code compiles and runs just fine. From my perspective, if I see this working, I would assume that SqsListener plays nice with Mono, but the catch is that the library only checks for CompletableFutures to know whether the processing is asynchronous or not, so, it means that although it seems to work fine, messages are being acknowledged before they're even being processed, which can definitely lead to message loss.

One alternative to avoid supporting Project Reactor would be to fail when the return type isn't void or CompletableFuture, which is fine IMHO. Although it still wouldn't support Project Reactor, it would make the library safer and explicitly say that you have to do the conversion yourself if you're doing things in the wrong way. Given that Spring provides strong support for Project Reactor, the current behavior may seem somehow unclear.

Regarding coroutines, AFAIK, it's an optional dependency in Spring Core. The building blocks that Spring Core provides allow other modules to provide seamless support for coroutines as well, that's why I wonder if it could be also added here. Again, nothing is ever mandatory, as I said previously, it's not like it can't be done manually, but it's always nice to avoid having to add unnecessary conversions when it's possible. Considering that coroutines can be added as an optional dependency, I really think it's something that should be considered.

I'm working on a pull request myself, I'll probably send it in the next couple of days. To avoid sending lots of code at once, the PR is only for coroutine support. Later we can also discuss Project Reactor as well.

@tomazfernandes
Copy link
Contributor

This code compiles and runs just fine

I would expect it not working since the Publisher chain is not being subscribed to anywhere. What am I missing?

The tricky part of supporting Reactor in this structure is that, at least as I have thought it, we would like to receive a Flux<Message> or Flux<POJO>, and likely return a Mono<Void>.

We can probably use a Sink for that, and push messages as we get them in the Listener part, using an Adapter similar the Blocking ones we have.

Then we'd need to wire this up to the rest of the chain so that messages are properly sent down the rest of the pipeline, so that they can be acknowledged, release the back pressure callback, trigger error handling, etc.

So while it's definitely not impossible, I don't think it's so simple too, and something that has to be properly thought of and tested so we don't end up with a feature that doesn't work the way it's expected to.

the PR is only for coroutine support

Cool, thanks! I don't know the first thing about Kotlin TBH, but maybe you can walk me through on how to run and see it at play.

@gabfssilva
Copy link
Author

I would expect it not working since the Publisher chain is not being subscribed to anywhere. What am I missing?

That's actually correct; my bad. The only way it would fit into the scenario I spoke of would be if there were some eager processing before returning the Publisher, which I think is very unlikely.

As I said previously, Spring Core offers a straightforward way to invoke suspend functions:

var publisher = CoroutineUtils.invokeSuspend(coroutineContext, method, bean, args); 

As long as the method is marked as a coroutine (suspend), everything works just fine, and the publisher can be consumed so that the function is successfully invoked.

The problem is that changing the actual method-invoking implementation is quite difficult. From the code snippets I've seen, it appears that if the reactive implementation were complete, coroutines would also be supported. I delved a bit into the code this weekend and noticed that there's an entirely different implementation for reactive programming, which I find quite intriguing. It seems like Spring Messaging doesn't share much code between its reactive and non-reactive APIs, which complicates things.

After that, I started to wonder:

  • Does it make sense to implement coroutine support without Reactor, considering that if Reactor were implemented, coroutines would work as well?
  • Is the effort required to make the library work with Reactor so significant that it would make sense to implement coroutines separately, given that there are no plans to implement Reactor at all?

I've noticed that everything being parsed (body, headers, etc.) is specific to either the reactive or non-reactactive approach. That's why I think it would take a considerable amount of time.

Once again, the goal is to make this possible:

class HelloListener { 
    @SqsListener("hello") 
    suspend fun hello(message: String) = println("hello, $message") 
} 

Instead of this:

class HelloListener { 
    val scope = CoroutineScope(Dispatchers.Default) 

    @SqsListener("hello") 
    fun hello(message: String): CompletableFuture<Unit> = 
        scope.future { 
            println("hello, $message")
        }
}

So, while I see the benefits, it might not be worth implementing it without Reactor, especially if there's a possibility that it could be implemented in the future.

I want to hear a little bit of your thoughts on this, @tomazfernandes. I'm 100% available for a quick chat if you're up for it.

@tomazfernandes
Copy link
Contributor

Hey @gabfssilva!

I've noticed that everything being parsed (body, headers, etc.) is specific to either the reactive or non-reactive approach. That's why I think it would take a considerable amount of time.

The approach I thought of for supporting Reactor in SCAWS SQS would definitely not require reimplementing all of this. The integration itself is already non-blocking, so we'd just need to support the parts that integrate with user code.

The thing with Reactor for this project is that it's a whole other programming paradigm we'd need to support, and then the inevitable questions of why don't we support Reactor for all the other integrations, and so on. Keep in mind that after implementing it, we still need to support it.

Given with Virtual Threads Reactor usage tends to decrease overall, we chose to not support it, though we could revisit this decision if a different scenario appears.

Is the effort required to make the library work with Reactor so significant that it would make sense to implement coroutines separately, given that there are no plans to implement Reactor at all?

Given my lack of experience with Kotlin, it's hard for me to measure effort for it. From the snippets you provided, the first one looks interesting, though I don't think the second one is all that bad. If it's simple enough to create this integration, I'd say it's worth a shot.

I'm 100% available for a quick chat if you're up for it.

I think our chat is long overdue! I'm not sure I'll be able to make the time this week or the next one, but if not let's talk after that.

@gabfssilva
Copy link
Author

Given with Virtual Threads Reactor usage tends to decrease overall, we chose to not support it, though we could revisit this decision if a different scenario appears.

I believe they serve quite different purposes tho. I would say Loom will simplify the creation of simple non-blocking I/O applications, but features such as pipelines, reactive streams, backpressure and so on are simply not part of what Loom proposes to solve. As a matter of fact, Loom may even simplify the implementation of reactive stream libraries, so, I wouldn’t say it will make Reactor usage less common. As fair as I could tell, it could even amplify their usage, since Loom allows imperative structures alongside reactive streams, pretty much as what coroutines provide for Kotlin. It’s a philosophical discussion. 😅

I think our chat is long overdue! I'm not sure I'll be able to make the time this week or the next one, but if not let's talk after that.

Let’s schedule then. I’ll ping you when you’re ready. Thank you very much!

@tomazfernandes tomazfernandes added the type: enhancement Smaller enhancement in existing integration label Jan 5, 2024
@elkhart
Copy link

elkhart commented May 3, 2024

Hey @gabfssilva,
thanks for bringing this topic up. This is a very interesting discussion.
We're using this library all over the place since we base almost all our async communication on SNS/SQS.
We also decided to use Kotlin over Java a couple of years back.
Now we looked into the Kotlin-based async-version of SqsListener

Above you wrote

class HelloListener { 
    val scope = CoroutineScope(Dispatchers.Default) 

    @SqsListener("hello") 
    fun hello(message: String): CompletableFuture<Unit> = 
        scope.future { 
            println("hello, $message")
        }
}

Is this code really working for you?
When implementing a listener like this we run into a ClassCastException like

11:32:00  13:32:00.097 ERROR [outine#119] .s.AbstractMessageProcessingPipelineSink: Error processing message db320739-ace6-4e8a-914b-fa3708faa838.
11:32:00  java.util.concurrent.CompletionException: io.awspring.cloud.sqs.listener.ListenerExecutionFailedException: Listener failed to process message
11:32:00  	at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368)
11:32:00  	at java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377)
11:32:00  	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152)
11:32:00  	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
11:32:00  	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
11:32:00  	at kotlinx.coroutines.future.CompletableFutureCoroutine.onCompleted(Future.kt:53)
11:32:00  	at kotlinx.coroutines.AbstractCoroutine.onCompletionInternal(AbstractCoroutine.kt:90)
11:32:00  	at kotlinx.coroutines.JobSupport.tryFinalizeSimpleState(JobSupport.kt:293)
11:32:00  	at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:857)
11:32:00  	at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:829)
11:32:00  	at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:97)
11:32:00  	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
11:32:00  	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
11:32:00  	at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:111)
11:32:00  	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:99)
11:32:00  	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:585)
11:32:00  	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:802)
11:32:00  	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:706)
11:32:00  	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:693)
11:32:00  Caused by: io.awspring.cloud.sqs.listener.ListenerExecutionFailedException: Listener failed to process message
11:32:00  	at io.awspring.cloud.sqs.listener.pipeline.MessageListenerExecutionStage.lambda$process$1(MessageListenerExecutionStage.java:51)
11:32:00  	at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990)
11:32:00  	at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974)
11:32:00  	... 16 common frames omitted
11:32:00  Caused by: java.util.concurrent.CompletionException: java.lang.ClassCastException: class kotlin.Unit cannot be cast to class java.lang.Void (kotlin.Unit is in unnamed module of loader 'app'; java.lang.Void is in module java.base of loader 'bootstrap')
11:32:00  	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
11:32:00  	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
11:32:00  	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)
11:32:00  	... 16 common frames omitted
11:32:00  Caused by: java.lang.ClassCastException: class kotlin.Unit cannot be cast to class java.lang.Void (kotlin.Unit is in unnamed module of loader 'app'; java.lang.Void is in module java.base of loader 'bootstrap')
11:32:00  	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
11:32:00  	... 16 common frames omitted

The only way around that we found looks like this

class HelloListener { 
    val scope = CoroutineScope(Dispatchers.Default) 

    @SqsListener("hello") 
    fun hello(message: String): CompletableFuture<Void?> = 
        scope.future { 
            println("hello, $message")
            null
        }
}

Sorry for hijacking but this troubled us quite a lot and it would be nice to know if there is another, slightly nicer to look at way of implementing this in Kotlin.

@gabfssilva
Copy link
Author

gabfssilva commented May 9, 2024

@elkhart, I confess I don't remember exactly, but the ClassCastException does make sense, since Unit != Void. If I'm not mistaken, you could potentially use AsyncMessageListener to improve the readability a little bit by hiding the implementation inside an abstract class:

abstract class CoMessageListener<T> : AsyncMessageListener<T> {
    val scope = CoroutineScope(Dispatchers.Default) 

    override fun onMessage(message: Message<T>): CompletableFuture<Void?> = 
        scope.future { 
            coOnMessage(message)
            null
        }

    suspend fun coOnMessage(message: Message<T>): Unit
}

And then:

class HelloListener : CoMessageListener<String> { 
    override suspend fun coOnMessage(message: Message<String>) = 
        println("hello, ${message.body()}!")
}

While it doesn't remove the need for the Void? trick, at least it's placed in a single file. I'm not sure if you can still use the @SqsListener annotation. You might need to register a MessageListenerContainer manually.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue status: waiting-for-feedback Waiting for feedback from issuer type: enhancement Smaller enhancement in existing integration
Projects
None yet
Development

No branches or pull requests

3 participants