Skip to content

Conversation

@oneby-wang
Copy link
Contributor

Motivation

There no independent receiverQueueSize config for single partition consumer in MultiTopicsConsumerImpl and PatternMultiTopicsConsumerImpl. Although maxTotalReceiverQueueSizeAcrossPartitions config can limit in-memory messages of a single topic with multi-partitions, but it can't limit in-memory messages of multi-topics.

For example, if we subscribe to a regex pattern that matches 1000 non-partitioned topics. Before this PR, each non-partitioned topic consumer's receiverQueueSize is 1000(ConsumerImpl uses the same receiverQueueSize value as PatternMultiTopicsConsumerImpl), the max messages in memory is 1000 + 1000 * 1000 = 1001000. Let's ignore the insignificant number 1000, if each message size is 8Kb, then we need 1000000 * 8Kb = 7,812.5MB memory to boot our application in catch-up read situation, which is unnecessary.

private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
if (log.isDebugEnabled()) {
log.debug("[{}] startReceivingMessages for {} new consumers in topics consumer, state: {}",
topic, newConsumers.size(), getState());
}
if (getState() == State.Ready) {
newConsumers.forEach(consumer -> {
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(),
consumer.getCurrentReceiverQueueSize());
internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, true));
});
}
}

For backward compatibility, add multiTopicsSinglePartitionReceiverQueueSizeEnable switch, the default value is set to false.

Modifications

Add independent multiTopicsSinglePartitionReceiverQueueSize config for single consumer in MultiTopicsConsumerImpl to reduce memory consumption.

Verifying this change

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: oneby-wang#7

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Oct 17, 2025
@oneby-wang
Copy link
Contributor Author

If this PR is ok, I'll add unit tests to cover the cde change.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There no independent receiverQueueSize config for single partition consumer in MultiTopicsConsumerImpl and PatternMultiTopicsConsumerImpl. Although maxTotalReceiverQueueSizeAcrossPartitions config can limit in-memory messages of a single topic with multi-partitions, but it can't limit in-memory messages of multi-topics.

For example, if we subscribe to a regex pattern that matches 1000 non-partitioned topics. Before this PR, each non-partitioned topic consumer's receiverQueueSize is 1000(ConsumerImpl uses the same receiverQueueSize value as PatternMultiTopicsConsumerImpl), the max messages in memory is 1000 + 1000 * 1000 = 1001000. Let's ignore the insignificant number 1000, if each message size is 8Kb, then we need 1000000 * 8Kb = 7,812.5MB memory to boot our application in catch-up read situation, which is unnecessary.

This is a valid problem. However, in this case, since it requires public API changes, it's better to start a discussion on the dev mailing list to ask for feedback from other Pulsar community members. Changing the public API will require a PIP eventually. Before creating a PIP, it's better to ask feedback on ways to address this problem and what approach would others recommend. Adding more options to the API isn't great from usability perspective since it causes more overhead in understanding the usage. It would be better if the multi-topics consumer could automatically tune itself to handle the situation. That might have been the purpose of PIP-74 receiver queue autoscaling (PR 14494), so it's worth checking that out too.

Are you already using PIP-74 memory limiter (enabled by default) and receiver queue autoscaling?

@oneby-wang
Copy link
Contributor Author

oneby-wang commented Oct 18, 2025

Not tried autoscaling yet. I noticed that we can auto scale receiverQueueSize using memory limiter by turning on autoScaledReceiverQueueSizeEnabled switch, but after some code analysis, I think autoscaling may probably not work in multi-topics situation.

Every single partition consumer starts with currentReceiverQueueSize flow action, meaning that the consumer will receive currentReceiverQueueSize messages.

private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
if (log.isDebugEnabled()) {
log.debug("[{}] startReceivingMessages for {} new consumers in topics consumer, state: {}",
topic, newConsumers.size(), getState());
}
if (getState() == State.Ready) {
newConsumers.forEach(consumer -> {
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(),
consumer.getCurrentReceiverQueueSize());
internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, true));
});
}
}

MultiTopicsConsumerImpl creates single partition consumer using there own receiverQueueSize and set batch receive maxNumMessages to (receiverQueueSize / 2).

private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> configurationData, String partitionName,
int partitionIndex, CompletableFuture<Consumer<T>> subFuture,
boolean createIfDoesNotExist, Schema<T> schema) {
BatchReceivePolicy internalBatchReceivePolicy = BatchReceivePolicy.builder()
.maxNumMessages(Math.max(configurationData.getReceiverQueueSize() / 2, 1))
.maxNumBytes(-1)
.timeout(1, TimeUnit.MILLISECONDS)
.build();
configurationData.setBatchReceivePolicy(internalBatchReceivePolicy);
configurationData = configurationData.clone();
return ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider(),
partitionIndex, true, listener != null, subFuture,
startMessageId, schema, this.internalConsumerInterceptors,
createIfDoesNotExist, startMessageRollbackDurationInSec);
}

If autoScaledReceiverQueueSizeEnabled switch is on, we set currentReceiverQueueSize to minReceiverQueueSize().

public void initReceiverQueueSize() {
if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, minReceiverQueueSize());
} else {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
}
}

minReceiverQueueSize() is (2 * batchReceivePolicy.getMaxNumMessages() - 2≈ receiverQueueSize) according to above analysis. So, the application boots with about (n * receiverQueueSize) messages, n is the topic nums.

public int minReceiverQueueSize() {
int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
if (batchReceivePolicy.getMaxNumMessages() > 0) {
// consumerImpl may store (half-1) permits locally.
size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2);
}
return size;
}

Every single partition consumer's minReceiverQueueSize() is about receiverQueueSize due to batch receive maxNumMessages is (receiverQueueSize / 2), so it can't be reduced below (2 * batchReceivePolicy.getMaxNumMessages() - 2 ≈ receiverQueueSize).

protected void reduceCurrentReceiverQueueSize() {
if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
return;
}
int oldSize = getCurrentReceiverQueueSize();
int newSize = Math.max(minReceiverQueueSize(), oldSize / 2);
if (oldSize > newSize) {
setCurrentReceiverQueueSize(newSize);
}
}

This PR can tune single partition consumer's receiverQueueSize and batch receive maxNumMessages using multiTopicsSinglePartitionReceiverQueueSize config, which may help to solve this problem.

Above is just pure code analysis, not tested by myself yet.

And more, I think multi-topics consumer and it's inner single partition consumers just like parent container and child containers relation, and their receiverQueueSize should be tuned independently in someway.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants