Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Rate-Limiter fails with a huge spike in a traffic, and publish/consume stuck for a longer time #24002

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

rdhabalia
Copy link
Contributor

@rdhabalia rdhabalia commented Feb 19, 2025

Fixes #24001

Motivation

Fundamental Issues Identified:

1. Eventual Consistency Across Multiple Threads:

Each thread maintains its own counter for pendingConsumedTokens, which gets aggregated periodically.
During a spike, multiple producers concurrently increase pendingConsumedTokens, leading to an aggregated count that significantly exceeds the configured rate.

2. Deep Negative Token Count Problem:

Since tokens are refreshed at fixed intervals, a large negative value requires multiple refresh cycles to recover.
This leads to long pauses, causing the publish/dispatch system to get stuck.

Image

Necessary Check to avoid large recovery time and unexpected throttling behavior:

Bound Negative Token Values: Implement a threshold to prevent tokens from going excessively negative, limiting recovery time.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@lhotari
Copy link
Member

lhotari commented Feb 19, 2025

The problem fixed by #23930 will prevent excessive negative tokens. I'll provide more details in the review.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change isn't needed since the problem is already fixed by #23930.
Before #23930 there was a bug in the token calculation where the added tokens were incorrectly limited to the bucket size before subtracting consumed tokens since the last update. That caused the excessive negative tokens.

It is an incorrect assumption that tokens would always be positive in the AsyncTokenBucket solution. That's part of the design that the token value can go negative. This is explained in PIP-322: Pulsar Rate Limiting Refactoring.

Since the problem is already fixed, please test with #23930 changes. The test in this PR is invalid due to negative tokens being an important part of the PIP-322: Pulsar Rate Limiting Refactoring design.

@rdhabalia
Copy link
Contributor Author

First, the issue we are discussing already occurred after merging patch #23930. I spent a good amount of time deeply analyzing the problem and found that multiple threads can attempt to consume tokens, leading to negative token accumulation. While I understand that negative tokens can occur, the concern here is that a large number of negative tokens takes considerable time to recover to a positive state, sometimes exceeding an hour, delaying publish/consume operations.

I am quite disappointed and saddened that the core points of discussion are consistently overlooked or ignored. This happened earlier while discussing a risk-free approach, and now again while addressing a fundamental problem in the eventual consistency workflow. Instead of discussing the core issue, the conversation gets sidetracked by minor implementation details—such as the assumption about negative tokens—resulting in the PR being blocked.

I have no objections to the PR being blocked, as I have seen similar fundamental and critical PRs blocked in the past by individuals from the same org—only to be merged years later when the drawbacks of the alternative approach became apparent. While I had concerns about this feature and other PRs, I did not block them. If blocking the PR is the way you prefer to operate, so be it.

Now, coming back to the main issue:

a. Did this issue arise after merging patch #23930?
Yes. The problem is reproducible by calling the consumeToken API from multiple threads, which naturally occurs when multiple producers are publishing simultaneously.

b. Does having a large number of negative tokens break the existing rate-limiting behavior?
Yes. Previously, if a broker had a rate limit of 2K messages per second, it consistently dispatched 2K messages even during traffic spikes. This behavior is now broken.

c. Are we still seeing issues after merging #23930?
Yes. I have explained the reasons and provided tests demonstrating the issue.

d. Should we ensure that producers and consumers do not remain stuck for prolonged periods?
Absolutely yes. This is a significant deviation from the previous behavior, and I am surprised that it has not been discussed more seriously.

I have invested my time to highlight this issue, propose alternatives, and stress the need for a risk-free approach in a system like Pulsar. Ignoring these concerns will only lead to the issue resurfacing later, as we've seen before. PRs will sit idle until the problem becomes unavoidable, at which point we will be forced to revisit the same discussion.

I might have the privilege of running a patched version for myself, but not all organizations can do the same. It is our responsibility to ensure stable, pain-free releases for everyone.

@lhotari
Copy link
Member

lhotari commented Feb 20, 2025

First, the issue we are discussing already occurred after merging patch #23930. I spent a good amount of time deeply analyzing the problem and found that multiple threads can attempt to consume tokens, leading to negative token accumulation. While I understand that negative tokens can occur, the concern here is that a large number of negative tokens takes considerable time to recover to a positive state, sometimes exceeding an hour, delaying publish/consume operations.

Thanks for sharing these details. I think that we'll be able to make progress together.

The reason why I thought that you hadn't tested with #23930 is that issue report #24001 mentions "3.x".

Would it be possible to share details about the issue? We'd first need a reproducer in the OSS project. Since you have reproduced after patch #23930, could you also please share more details so that it we'd be able to create a reproducer in the OSS project?

Once we have a reproducer, we will be able to add tests to prevent such regressions in the future.

The test added in this PR is invalid and doesn't represent the issue about publish and consuming being stuck for a long time. I guess we can agree about that?
At the unit level of AsyncTokenBucket, there is no such bug that you are addressing in this PR. It's possible that there's a problem somewhere else in the overall solution. It's also possible that the PR seems to resolve your problem as a side effect.

I am quite disappointed and saddened that the core points of discussion are consistently overlooked or ignored.

What is overlooked or ignored?

This happened earlier while discussing a risk-free approach, and now again while addressing a fundamental problem in the eventual consistency workflow. Instead of discussing the core issue, the conversation gets sidetracked by minor implementation details—such as the assumption about negative tokens—resulting in the PR being blocked.

The risk-free approach in Pulsar is mainly about the LTS release policy. The PIP-322 feature was included in Pulsar 3.2.0 which was a minor release. Pulsar 3.2.0 was released over a year ago and there haven't been any issue reports about rate limiting until very recently.

The reason why a feature toggle wasn't added for PIP-322 features is that adding conditionals in this area will complicate the solution and make Pulsar maintenance harder over the years. It's possible that certain behaviors of the old solution are something that some users have been depending on. Avoiding breaking changes is hard since many might also depend on undocumented and buggy behaviors.

When someone upgrades their Pulsar deployment, they should prepare for downgrading if there are issues. As mentioned above, PIP-322 was introduced over a year ago. The amount of problems caused is not creating disasters since there would be more issue reports if that were the case. By now, there have only been 2 related issue reports: #23920 and #24001. Both issues seem to have similar symptoms.

The motivation I have for PIP-322 is about making Pulsar a competitive alternative for Confluent's Kora. In Confluent Kora, according to the Kora paper, they have "backpressure and auto-tuning" as well as "dynamic quota management". The PIP-322 rate limiting is a foundation which would allow adding "dynamic quota management" into Pulsar and many other QoS features. Having feature toggles and unmaintainable code around will make this transition harder.

For Pulsar users who prioritize stability, they should stick to LTS releases. Perhaps there should be a separate status for a new LTS release when it is claimed stable. This would help users looking for stability decide when to upgrade. Pulsar 3.0.x LTS is actively maintained at least until May, with security support until May 2026.

I have no objections to the PR being blocked, as I have seen similar fundamental and critical PRs blocked in the past by individuals from the same org—only to be merged years later when the drawbacks of the alternative approach became apparent. While I had concerns about this feature and other PRs, I did not block them. If blocking the PR is the way you prefer to operate, so be it.

As mentioned before, the test in this PR doesn't represent the issue. The test is invalid since AsyncTokenBucket's design is that it's eventually consistent. It's valid that the tokens become negative. A better way to approach addressing the problem that you are facing is to first create a reproducer about "with a huge spike in a traffic, publish/consume stuck for a longer time". The first reproducer could be manual. @rdhabalia Could we work together to achieve that? On Pulsar Slack, there's also the huddle option to make video calls. Every 2nd week there's the Pulsar community meeting where we can also meet.

b. Does having a large number of negative tokens break the existing rate-limiting behavior? Yes. Previously, if a broker had a rate limit of 2K messages per second, it consistently dispatched 2K messages even during traffic spikes. This behavior is now broken.

Once we have the reproducer, it will help to understand this detail. One possibility is that the previous implementation wasn't a strict rate limiting implementation and it simply ignored sudden traffic spikes. Have you considered that possibility?

c. Are we still seeing issues after merging #23930? Yes. I have explained the reasons and provided tests demonstrating the issue.

Replied above. Please provide instructions on how to reproduce with Pulsar. Unless this is possible, please explain details about your use case and scenario, with relevant information about number of brokers, number of topics/partitions, consumers per partition, producers per partition, and subscription type. Sharing topic stats and stats-internal would be very useful, as many reporters do.

d. Should we ensure that producers and consumers do not remain stuck for prolonged periods? Absolutely yes. This is a significant deviation from the previous behavior, and I am surprised that it has not been discussed more seriously.

We'll find a solution to the problem. I'm committed to that.

I have invested my time to highlight this issue, propose alternatives, and stress the need for a risk-free approach in a system like Pulsar. Ignoring these concerns will only lead to the issue resurfacing later, as we've seen before. PRs will sit idle until the problem becomes unavoidable, at which point we will be forced to revisit the same discussion.

As mentioned above, the risk-free approach is to stick to a stable LTS release. The gap we currently have is how to announce a new LTS release as stable. Risk-free approaches require also involvement of Pulsar users. Testing early and often will help improve Pulsar and reduce risk in LTS releases. Testing Pulsar releases would also be an important contribution to the Apache Pulsar project.

I might have the privilege of running a patched version for myself, but not all organizations can do the same. It is our responsibility to ensure stable, pain-free releases for everyone.

Every project has bugs and Pulsar is actively maintained. The risk-free and pain-free approach is about improving Pulsar's LTS release policy and upgrade guides. We need more volunteers to help with that. For organizations that requirement more than what the OSS project offers, there are commercial providers that can help.

Getting back to this issue: Let's first reproduce "with a huge spike in a traffic, and publish/consume stuck for a longer time" and that will take us forward. Are you fine with that?

Copy link
Member

@nodece nodece left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the shared data, it’s evident that the tokens is negative, and broker only becomes positive after multiple rounds of token refreshing. This behavior can lead to unacceptable delays in reaching a positive value.

This PR improves that process by ensuring the tokens can quickly become positive after the first round of refreshing. This reduces unnecessary delays and enhances overall efficiency. I think this addresses the issue effectively, so LGTM.

I think we need to refactor this test, so like:

    @Test
    void testRefreshNegativeToken() throws Exception {
        int rate = 2000;
        int resolutionTimeNano = 8;
        asyncTokenBucket = AsyncTokenBucket.builder().rate(rate).ratePeriodNanos(TimeUnit.SECONDS.toNanos(1)).clock(
                        new DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(resolutionTimeNano),
                                System::nanoTime))
                .build();
        asyncTokenBucket.getPendingConsumedTokens().add(rate * 5);
        Thread.sleep(2 * 1000);
        assertTrue(asyncTokenBucket.consumeTokensAndCheckIfContainsTokens(100));
    }

@lhotari
Copy link
Member

lhotari commented Feb 20, 2025

From the shared data, it’s evident that the tokens is negative, and broker only becomes positive after multiple rounds of token refreshing. This behavior can lead to unacceptable delays in reaching a positive value.

@nodece Yes, before #23930, there was such a problem. @rdhabalia hasn't confirmed whether the screenshots of the heap dump were from a Pulsar version with #23930. The issue report says that the version was 3.x, so based on that, #23930 wasn't included.

This PR improves that process by ensuring the tokens can quickly become positive after the first round of refreshing. This reduces unnecessary delays and enhances overall efficiency. I think this addresses the issue effectively, so LGTM.

@nodece I have explained why this is not how AsyncTokenBucket works and is designed. If this PR is made, it would break AsyncTokenBucket and it wouldn't be an implementation of the token bucket algorithm anymore with a specified behavior. Let's first reproduce the actual issue "with a huge spike in a traffic, and publish/consume stuck for a longer time" and then draw conclusions.

Reproducing the actual issue will most likely reveal new information. One detail about the old implementation is that there wasn't a proper specification about the behavior. Due to implementation details, it seems that sudden traffic spikes are ignored. This is a useful feature for some use cases and adding a feature to PIP-322 to make this configurable is a potential future improvement. It would be very simple to implement this in Pulsar with the AsyncTokenBucket. Currently the bucket size is the same as the rate for the rate period. By making the bucket size separately configurable from the rate, it would make it possible to tolerate traffic spikes for a configurable period of time. There's nothing that needs to change in AsyncTokenBucket since it already supports a configurable bucket size with capacity. We'd simply have to make this configurable.

@TakaHiR07
Copy link
Contributor

TakaHiR07 commented Feb 21, 2025

Our cluster also use new rateLimiter in pip-323 . In our test, previous rateLimiter is actually not a strict rate limiter implementation, because without negative token, token is added in every second and broker would resume netty autoRead config. This would make rateLimiter not strict. I guess this point has been described in pip-323.

And I guess the described issue in this pr is also exist. With a very huge spike in traffic(much larger than the rate config), may lead to a large negative token. For me, this issue may be inconsequential, because we use broker rateLimiter to avoid abnormal traffic crashing down broker. If the traffic is a bit larger than rate config, after a few time it would recover. If the traffic is very abnormal, should stuck and check out the reason.

In general, making token not negative seems is not a appropriate solution, it just go back to previous rateLimiter implementation which can not limit the traffic as our thought.

@nodece
Copy link
Member

nodece commented Feb 21, 2025

I have explained why this is not how AsyncTokenBucket works and is designed.

Sorry for overlooking the behavior of the broker for re-reading the network package from the channel.

In the current implementation of the AsyncTokenBucket, the tokens only become positive after multiple rounds of refreshing. This behavior is unacceptable, as it introduces unnecessary delays in processing. The primary concern here is that I can tolerate a surge in traffic during the next rate limit cycle, but I cannot accept no traffic at all.

with a huge spike in a traffic, and publish/consume stuck for a longer time

We can directly increase pendingConsumedTokens, please check my test.

@rdhabalia
Copy link
Contributor Author

This behavior is unacceptable, as it introduces unnecessary delays in processing. The primary concern here is that I can tolerate a surge in traffic during the next rate limit cycle, but I cannot accept no traffic at all.

Yes. that's what I have been saying from the beginning in slack channel, issue and PR. I am not sure why is it difficult to understand? and why this PR is blocked? this feature is creating outage and it's a very serious issue. Please unblocked the PR.

@lhotari
Copy link
Member

lhotari commented Feb 21, 2025

I have explained why this is not how AsyncTokenBucket works and is designed.

Sorry for overlooking the behavior of the broker for re-reading the network package from the channel.

In the current implementation of the AsyncTokenBucket, the tokens only become positive after multiple rounds of refreshing. This behavior is unacceptable, as it introduces unnecessary delays in processing. The primary concern here is that I can tolerate a surge in traffic during the next rate limit cycle, but I cannot accept no traffic at all.

@nodece The AsyncTokenBucket itself is working correctly. I agree that it's a problem if no traffic gets accepted. We should be addressing that problem and understanding why it happens.

I can see why it happens and the problem of overuse should be solved instead. Another matter is that rate limiters should have a separate configuration property for the capacity of the bucket so that it's possible to tolerate a configurable period of bursty traffic.

Regarding the overuse, the way it can currently happen is this way:

  • dispatcher limiters are checked for tokens before a read happens in a dispatcher in readMoreEntries method which calls calculateToRead method where tokens are checked.
  • dispatcher limiter tokens are consumed after the messages have been sent, this happens in acquirePermitsForDeliveredMessages method, called in trySendMessagesToConsumers

The reason why there could be overuse is that all inflight reads between readMoreEntries and trySendMessagesToConsumers can cause the overuse. I wonder if we could define the correct behavior? I'd assume that a "surge in traffic" should cause a backlog for the consumers instead of letting the consumers overuse the tokens.

As mentioned above, another matter would be to allow configurable bursts to happen. That's already a feature supported by AsyncTokenBucket with the capacity property. This feature would have to be exposed in configuration for each rate limiter.

For preventing the overuse of dispatch tokens, one possible solution would be to consume the tokens directly in readMoreEntries. Since tokens won't necessarily be used, there should be a way to return tokens back into the token bucket. That could happen in the place of acquirePermitsForDeliveredMessages where this adjustment of the balance happens. It's possible that some other execution paths would need to be handled.

with a huge spike in a traffic, and publish/consume stuck for a longer time

We can directly increase pendingConsumedTokens, please check my test.

When you use any implementation, tests shouldn't be poking into the internal data structures. That's not a valid test of an "unit" when private methods are used. The reason why pendingConsumedTokens shouldn't be poked directly is that when tokens are consumed, there's a guarantee, that the state will be consistent with the granuality of the configured "resolution" of the AsyncTokenBucket.
In this case, the reproducer should first happen with an end-to-end test case and after that exists, we can go down to unit level..

@lhotari
Copy link
Member

lhotari commented Feb 21, 2025

This behavior is unacceptable, as it introduces unnecessary delays in processing. The primary concern here is that I can tolerate a surge in traffic during the next rate limit cycle, but I cannot accept no traffic at all.

Yes. that's what I have been saying from the beginning in slack channel, issue and PR. I am not sure why is it difficult to understand? and why this PR is blocked? this feature is creating outage and it's a very serious issue. Please unblocked the PR.

@rdhabalia I have replied in #24002 (comment). Please also reply to the questions I have asked in #24002 (comment) . That will help us make progress together.

@rdhabalia
Copy link
Contributor Author

@rdhabalia I have replied in #24002 (comment). Please also reply to the questions I have asked in #24002 (comment) . That will help us make progress together.

I don't think you read the fundamental issue that I or others are saying. also I had mentioned in the very first line of the issue and PR that issue was reproduced after cherry-picking the new PR. However, consider publish rate-limiting where multiple producers are trying to consume-tokens at the same time which can make token -ve due to eventual consistency and this will put you in situation where it takes very long time to resume publish.

For preventing the overuse of dispatch tokens, one possible solution would be to consume the tokens directly in readMoreEntries

anyways, let's not put hack into top layer such as Consumer#Dispatcher or at Producer, rather let's fix the rate-limiter. if Rate-Limiter is thread safe by allowing multiple threads then Rate-Limiter should not takes longer time to recover positive rokens and resume the operations. let's fix this fundamental problem and then we can discuss next step.

@lhotari
Copy link
Member

lhotari commented Feb 21, 2025

@rdhabalia I have replied in #24002 (comment). Please also reply to the questions I have asked in #24002 (comment) . That will help us make progress together.

I don't think you read the fundamental issue that I or others are saying. also I had mentioned in the very first line of the issue and PR that issue was reproduced after cherry-picking the new PR. However, consider publish rate-limiting where multiple producers are trying to consume-tokens at the same time which can make token -ve due to eventual consistency and this will put you in situation where it takes very long time to resume publish.

There is no "fundamental issue" in AsyncTokenBucket. I have addressed this in my previous messages. Although the changes in this PR might solve your issue, it doesn't mean that it's the correct solution. Unless there's a reproducer at the system level and until you share more details about the use case, I would have to guess a lot about what scenario and problem you are facing. My current guess is about the behavior where the previous implementation tolerated overuse of tokens. This doesn't mean that the new solution should behave in that way. Instead, let's fix the fundamental issue that is really the problem. Unless we have a way to reproduce the issue you are facing, it's hard to know what problem you are facing and what solution solves that problem. Based on the title of the PR, the problem is "with a huge spike in traffic, the publish/consume is stuck for a long time." That's something that we can solve in the token bucket based rate limiter as I have suggested in my previous message.

For preventing the overuse of dispatch tokens, one possible solution would be to consume the tokens directly in readMoreEntries

anyways, let's not put hack into top layer such as Consumer#Dispatcher or at Producer, rather let's fix the rate-limiter. if Rate-Limiter is thread safe by allowing multiple threads then Rate-Limiter should not takes longer time to recover positive rokens and resume the operations. let's fix this fundamental problem and then we can discuss next step.

@rdhabalia I'll let you digest my previous messages and answer the questions about the details of your problem. My previous messages explain why the previous rate limiter ignored overuse and tolerated a "huge spike in traffic". One could say that this is a bug and another would say that it's a feature. Since the new rate limiter is based on the token bucket algorithm, it doesn't have a feature where it loses track of consumed tokens like the previous rate limiter did. Losing track of consumed tokens is a feature that some users like you are depending on and considering as correct behavior.

However, for a token bucket implementation, that's incorrect behavior. That's why this "with a huge spike in traffic, the publish/consume is stuck for a long time" problem has to be solved at a different level. We can pre-validate the solution once you reply to the questions and share more details. There will be a chance to continue validation and solve the problem once there are solutions to validate.

To summarize, I'm expecting this from your side:

@rdhabalia Would you be able to provide the above details? That will help us make progress in solving the problem that you and possibly others are facing. I completely acknowledge that this problem exists and we can solve it together.

@lhotari
Copy link
Member

lhotari commented Feb 21, 2025

Short update: I have been able to reproduce the issue. It is a bug that has been introduced with PIP-322 changes and I'm currently looking into the details.

@lhotari
Copy link
Member

lhotari commented Feb 21, 2025

This PR contains the reproducer and a fix to the problem: #24012 . The PR is still WIP since the test fails to some minor detail that the rate is 2x for the first second. I haven't yet investigated whether it's a test problem or a real issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Rate-Limiter fails with a huge spike in a traffic, and publish/consume stuck for a longer time.
4 participants