-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
Version: 4.0.8 via Helm chart pulsar-4.4.0 (but the bug is also present in current master)
Related: #25119 (different bug, same area — phase 2 __compaction cursor)
Issue Description
Summary
This bug was found because we update topic policies frequently in our pulsar cluster, and we noticed an accumulation of thousands of backlogged events in __change_events. Because compaction is triggered automatically when change events are added, we see this bug:
Compaction phase 2 fails every time with a ConnectException because the broker disconnects the compaction consumer as part of processing its own seek request, causing channelInactive to fire on the client and kill the in-flight seek future before the broker sends the success response. Since AbstractTwoPhaseCompactor.phaseTwoSeekThenLoop has no retry logic for transient seek failures, every compaction attempt aborts. The __compaction subscription backlog grows without bound.
Affected Topics
Any topic with compaction enabled. Most visibly affects __change_events system topics because SystemTopic.isCompactionEnabled() hardcodes true and the effective compactionThreshold is 0 bytes, so compaction is triggered on any non-zero backlog. Frequent topic-level policy writes (each write appends a message to __change_events) cause compaction to be triggered and fail in a continuous loop.
Expected Behavior
Compaction phase 2 seeks the __compaction reader back to the start of the compacted range and reads forward, producing a new compacted
ledger.
Actual Behavior
Compaction fails on every attempt and the __compaction subscription backlog grows indefinitely.
Error messages
[BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.p.compaction.AbstractTwoPhaseCompactor - Commencing phase two of compaction for
persistent://my-tenant/my-namespace/__change_events-partition-2, from 1218818:0:2:-1 to 1330087:5:2:-1, compacting 12 keys to ledger
1341961
[BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.p.client.impl.ConsumerImpl -
[persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] Seeking subscription to the message 1218818:0:2:-1
[pulsar-io-3-4] INFO o.a.p.broker.service.Consumer - Disconnecting consumer:
Consumer{subscription=PulsarCompactorSubscription{topic=persistent://my-tenant/my-namespace/__change_events-partition-2,
name=__compaction}, consumerId=27241, consumerName=6QQBD, address=[id: 0x6da7169d, L:/10.10.x.x:6650 - R:/10.10.x.x:53634] [SR:10.10.x.x,
state:Connected]}
[pulsar-io-3-4] INFO o.a.p.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer
Consumer{subscription=PulsarCompactorSubscription{topic=persistent://my-tenant/my-namespace/__change_events-partition-2,
name=__compaction}, ...}
[pulsar-io-3-4] INFO o.a.p.broker.service.persistent.PersistentSubscription -
[persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] Successfully disconnected consumers from subscription,
proceeding with cursor reset
[BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.b.mledger.impl.ManagedCursorImpl -
[my-tenant/my-namespace/persistent/__change_events-partition-2] Initiate reset readPosition from 1330087:6 to 1218818:0 (ackSet is null)
on cursor __compaction
[pulsar-io-3-6] INFO o.a.p.client.impl.ConnectionHandler - [persistent://my-tenant/my-namespace/__change_events-partition-2]
[__compaction] Closed connection [id: 0xb3fcdb76, L:/10.10.x.x:53634 ! R:/10.10.x.x:6650] -- Will try again in 0.1 s, hostUrl: null
[pulsar-io-3-6] ERROR o.a.p.client.impl.ConsumerImpl - [persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction]
Failed to reset subscription: Disconnected from server at /10.10.x.x:6650
[broker-client-shared-internal-executor-5-1] WARN o.a.p.broker.service.persistent.PersistentTopic -
[persistent://my-tenant/my-namespace/__change_events-partition-2] Compaction failure.
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Failed to seek the
subscription __compaction of the topic persistent://my-tenant/my-namespace/__change_events-partition-2 to the message 1218818:0:2:-1
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?]
at org.apache.pulsar.client.impl.ConsumerImpl.failSeek(ConsumerImpl.java:2631)
~[org.apache.pulsar-pulsar-client-original-4.0.8.jar:4.0.8]
at org.apache.pulsar.client.impl.ConsumerImpl.lambda$seekAsyncInternal$60(ConsumerImpl.java:2603)
~[org.apache.pulsar-pulsar-client-original-4.0.8.jar:4.0.8]
at org.apache.pulsar.client.impl.ClientCnx.channelInactive(ClientCnx.java:342)
~[org.apache.pulsar-pulsar-client-original-4.0.8.jar:4.0.8]
Caused by: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Failed to seek the subscription __compaction of the topic
persistent://my-tenant/my-namespace/__change_events-partition-2 to the message 1218818:0:2:-1
Disconnected from server at /10.10.x.x:6650
[BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.b.mledger.impl.ManagedCursorImpl -
[my-tenant/my-namespace/persistent/__change_events-partition-2] reset readPosition to 1218818:0 (ackSet is null) before current read
readPosition 1330087:6 on cursor __compaction
[BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.p.broker.service.ServerCnx - [/10.10.x.x:53634]
[persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] Reset subscription to message id 1218818:0 (ackSet is
null)
Reproducing the issue
- Create a namespace with frequent topic-level policy writes to cause
__change_eventsbacklog to grow. For example, callingsetMaxUnackedMessagesPerConsumerper topic on every consumer restart. This is a real-world use case that triggered these findings. - Observe compaction triggering automatically (threshold = 0 bytes for system topics) or trigger manually:
pulsar-admin topics compact persistent://my-tenant/my-namespace/__change_events - Observe compaction failure in broker logs
Additional information
Root Cause
The sequence in PersistentSubscription.resetCursorInternal (PersistentSubscription.java:916) is:
- disconnect active consumers via
dispatcher.disconnectActiveConsumers(true) - reset the managed cursor position via
ManagedCursorImpl.internalResetCursor - send
commandSender.sendSuccessResponse(requestId)back inServerCnx.handleSeek'sthenRun.
Step 1 closes the consumer's Netty channel server-side, which fires channelInactive on the client (ClientCnx.java:328). channelInactive immediately fails all entries inpendingRequests with ConnectException (ClientCnx.java:341–344), including the seek request that triggered the reset. By the time the server sends the success response in step 3, the client has already failed the seek future and aborted compaction.
The disconnect in step 1 is necessary for correctness — the dispatcher may have messages in flight to the consumer that would be inconsistent with the new cursor position — so the server-side ordering cannot simply be reversed. The fix belongs on the compactor side.
AbstractTwoPhaseCompactor.phaseTwoSeekThenLoop has no retry logic — a failed seekAsync propagates directly to whenComplete and aborts:
reader.seekAsync(from).thenCompose((v) -> {
// phase two loop
}).whenComplete((res, exception) -> {
if (exception != null) {
deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
promise.completeExceptionally(exception); // no retry
});
}
});Potential Fix
The seek is idempotent and the ConnectException is always transient in this context — the broker disconnects the consumer as part of processing the seek, so by the time the consumer reconnects the cursor is already at the correct position. Retrying seekAsync with a short delay allows the consumer to reconnect, at which point the seek succeeds immediately. But I leave that up to the implementors.
Are you willing to submit a PR?
- I'm willing to submit a PR!