Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix corePoolSize so that maximum number of messages (maxConcurrentMessages * number of queues) are processed simultaneously. #833

Merged
merged 7 commits into from
Jun 25, 2023

Conversation

mokamoto12
Copy link
Contributor

@mokamoto12 mokamoto12 commented Jun 5, 2023

📢 Type of change

  • Bugfix
  • New feature
  • Enhancement
  • Refactoring

📜 Description

Ensure that messages are processed concurrently up to maxConcurrentMessages * number of queues.
The value of corePoolSize is changed so that a new thread is created even when the ThreadPoolTaskExecutor queue is not full.

💡 Motivation and Context

Referring to the documentation, it is expected that maxConcurrentMessages * number of queues will be processed simultaneously.

The maximum number of messages from each queue that can be processed simultaneously in this container. This number will be used for defining the thread pool size for the container following (maxConcurrentMessages * number of queues).

ref. https://docs.awspring.io/spring-cloud-aws/docs/3.0.0/reference/html/index.html#sqscontaineroptions-descriptions

However, before this change, maxConcurrentMessages * number of queues messages were not processed simultaneously, only maxMessagesPerPoll messages were processed simultaneously.

We suspect that this is due to the ThreadPoolExecutor specification and is a problem from commit 30a4c4d.
ref. https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.html
ref. https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ThreadPoolExecutor.html

💚 How did you test it?

Executing the following code verifies that 20 messages are processed simultaneously, instead of one.

    @SqsListener(queueNames = "${queue-url}", pollTimeoutSeconds = "20", maxConcurrentMessages = "20", maxMessagesPerPoll = "1")
    public void listen(String message) {
        logger.info(String.format("start: %s", message));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        logger.info(String.format("end  : %s", message));
    }

📝 Checklist

  • I reviewed submitted code
  • I added tests to verify changes
    • I could not find any existing test for this behavior, so I do not know how to test it. If there is an existing test, please let me know, or if it is better to test it, I would appreciate your advice on how to test it.
  • I updated reference documentation to reflect the change
  • All tests passing
    • I would like to check the results with CI.
  • No breaking changes
    • Is this considered a breaking change? It is difficult for me to judge, so I was hoping you could advise me.

🔮 Next steps

@github-actions github-actions bot added the component: sqs SQS integration related issue label Jun 5, 2023
@tomazfernandes
Copy link
Contributor

Sorry for the delay @mokamoto12, and thanks for bringing this up.

You're right - while this ThreadPoolExecutor behavior of enqueueing tasks before creating up to maxPoolSize` threads makes sense from a resource usage perspective, it's not that intuitive. When adding the queue this behavior was overlooked and resulted in the bug.

Do you think we could add an integration test to assert this behavior?

Not sure how we'd do it though, maybe send 20 messages and hold each thread until all 20 messages are received, so we make sure they're being processed simultaneously?

Please let me know your thoughts. Thanks.

@mokamoto12
Copy link
Contributor Author

Thank you for the confirmation @tomazfernandes. I will add the test.
Please forgive me for pushing a few useless commits to check the test results in CI as the test does not work in my environment.

@mokamoto12
Copy link
Contributor Author

@tomazfernandes I have added a test. Before modifying the corePoolSize, the test was failing, and after the modification, I am confirming that the test is passing.

@jgslima
Copy link

jgslima commented Jun 21, 2023

Wow. This is a very serious issue. To limit the concurrency to only 10 for each application instance would be very bad.

I am trying to get familiar with some of the classes. It seems that SemaphoreBackPressureHandler is aware of the maxConcurrentMessages configuration (reflected in its totalPermits attribute), and it seems that it already limits, somehow, the message polling done by AbstractPollingMessageSource.

Maybe the maxConcurrentMessages restriction might be imposed only by the backPressureHandler, with the ThreadPoolExecutor actually being configured without a pre-defined size? Like a "cachedThreadPool"?

But I am not sure whether the backPressure actuates only in the polling step. Anyway, it does not seem very optimal to have 2 distinct places of the infra-structure enforcing the same restriction (maxConcurrentMessages).

Copy link
Contributor

@tomazfernandes tomazfernandes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR looks great and thanks for the test @mokamoto12!

TIL about CyclicBarrier, had never heard of it, thanks.

I just left one comment - please see if it makes sense to you.

Thanks!

@Autowired
LatchContainer latchContainer;

@SqsListener(queueNames = MAX_CONCURRENT_MESSAGES_QUEUE_NAME, maxMessagesPerPoll = "1", maxConcurrentMessages = "10", id = "max-concurrent-messages")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have this test with maxMessagesPerPoll to 10, maxConcurrentMessages to 20 and 20 messages please?

That would represent what I believe to be a more common scenario.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I have fixed it.
It seemed that the test would not pass unless the list of messages was sent twice, so that is what I did.

@tomazfernandes
Copy link
Contributor

Hey @jgslima, nice to see you here!

Wow. This is a very serious issue. To limit the concurrency to only 10 for each application instance would be very bad.

Yeah, that would be bad - fortunately it's 10 per container rather than per application, so not so dramatic with default settings.

Maybe the maxConcurrentMessages restriction might be imposed only by the backPressureHandler, with the ThreadPoolExecutor actually being configured without a pre-defined size? Like a "cachedThreadPool"?

It's not a good practice to have an unbounded ThreadPoolExecutor, since that can lead to performance and OOM issues. Better to let the Executor reject tasks so we can see we have an issue and take action.

But I am not sure whether the backPressure actuates only in the polling step. Anyway, it does not seem very optimal to have 2 distinct places of the infra-structure enforcing the same restriction (maxConcurrentMessages).

The thread pool configuration is really only to make sure we can handle enough threads, it's not supposed to enforce a restriction. The restriction is in the SemaphoreBackPressureHandler class only.

Feel free to ask if you have any other questions!

@jgslima
Copy link

jgslima commented Jun 21, 2023

Hello Tomaz!

Very good the enhancements on the 3.0 version. Congratulations and thank you.

When we used the 2.x version, we actually had to extend and change the container class, because that behaviour of blocking until all the messages batch had been completely processed before polling again, was not robust. If a single message got stuck due, for instance, a database lock, that application instance would essentially stop polling messages from that queue. We implemented a form of backpressure (not so sophisticated like yours).

Now, with a configuration like the "Executors.newCachedThreadPool()", in fact the maximumPoolSize is Integer.MAX_VALUE. However, as the threads are actually reused (being discarded after the keepAliveTime), if the behaviour of the application itself enforces a maximum amount for the concurrent tasks, you have a pool that just obeys the concurrency level the application needs. A cachedThreadPool is not the same as, say, a SimpleAsyncTaskExecutor, as the threads are in fact reused. And actually, in many moments it may have less threads than a fixed size thread pool configured with maxConcurrentMessages * number of queues, because it would only reach this amount of threads if the application really reaches this level of concurrency and stays in it.

I am not trying to insist in anything here, just having a conversation. Anyway, the framework already allows the application to provide its own executor.

@tomazfernandes
Copy link
Contributor

Very good the enhancements on the 3.0 version. Congratulations and thank you.

Hey @jgslima, thanks!

When we used the 2.x version, we actually had to extend and change the container class...

Yeah, I think we've all been there 😄 Hopefully no one will need to do this anymore! 🙌🏼

Now, with a configuration like the "Executors.newCachedThreadPool()", in fact the maximumPoolSize is Integer.MAX_VALUE. However, as the threads are actually reused (being discarded after the keepAliveTime), if the behaviour of the application itself enforces a maximum amount for the concurrent tasks, you have a pool that just obeys the concurrency level the application needs.

I think your reasoning makes sense, but IMO there are some tradeoffs in place that would not be good for having an unbounded executor.

If all goes well with concurrency control it doesn't make much difference if we're capping the top limit or not. If the limit is 20, we'll never have more than 20 threads, and it doesn't make a difference to have a thread limit of 20 or Integer.MAX_VALUE, right?

But let's say we have an issue in concurrency control that leads to a permit leak and now we can't enforce a limit anymore. I'd rather have an explicit task rejected exception than having the user app malfunction and eventually die of a mysterious OOM.

So that's why I think the limit is important as a failsafe. I wouldn't even have the queue if possible, but than there's a racing condition between releasing the permit and releasing the thread that would be too complex to get rid of.

A cachedThreadPool is not the same as, say, a SimpleAsyncTaskExecutor, as the threads are in fact reused. And actually, in many moments it may have less threads than a fixed size thread pool configured with maxConcurrentMessages * number of queues, because it would only reach this amount of threads if the application really reaches this level of concurrency and stays in it.

Yes, and that's exactly the way the ThreadPoolTaskExecutor works as well. At first it spawns no threads. Then it tries reusing them. If there are not enough threads alive, then it will spawn new threads up to maximumPoolSize. Threads that are idle for 60 seconds.

Does this make sense to you, or maybe I'm missing something?

Thanks!

Copy link
Contributor

@tomazfernandes tomazfernandes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the PR @mokamoto12! Looking forward to more!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants