-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Open
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
Pulsar: 3.0.13
OperatingSytem: Linux
Java: 17.0.18
Issue Description
The topic is not loaded because the reader of __change_events topic is stuck at some offset in compacted ledger.
Sequence of events:
- Producer and consumer clients tries to produce and consume message from topic persistent://t/n/t
- The topic is not loaded in broker A
- The produce and consume call triggers topic loading in broker A
- This triggers the loading of topic policies cache by creating reader of persistent://t/n/__change_events topic
- The topic persistent://t/n/__change_events is owned by broker B
- While reading from persistent://t/n/__change_events topic, the connection closes because broker B unloaded that topic (due to restart)
- The topic persistent://t/n/__change_events is now owned by broker A
- The reader reconnects after a timeout of 0.1s and triggers subscription creation on persistent://t/n/__change_events at offset (x).
- This reader just stuck at that offset (x),
- Whereas all other readers of persistent://t/n/__change_event reconnects at offset(y) and work normally.
- The compaction horizon at that time was z, such that x<z, y=z
- The topic never gets loaded until we manually restart broker A, and all produce and consume calls to this topic fails until then.
Impacts:
This makes all topics of the namespace t/n unavailable which are owned by broker A
Further Details:
- The client gets this error while calling consume (every ~1min for default configurations)
broker-A x:x:x:x [http-0] ERROR org.apache.pulsar.broker.service.ServerCnx - A failed consumer with id is already present on the connection. consumerId: 32, remoteAddress: /x.x.x.x:54598, subscription: s/p1
- The producer gets this error (every ~1min for default configurations):
org.apache.pulsar.client.impl.ProducerImpl - [persistent://t/n/t-partition-0] [p1] Temporary error in creating producer: request timeout {'durationMs': '30000', 'reqId':'2961594912657351684', 'remote':'x.x.x.x/x.x.x.x:6650', 'local':'/x.x.x.x:48342'}
- On broker we get following error for producer((every ~1min until broker was restarted):
-12:24:01.620 [pulsar-io-4-9] INFO org.apache.pulsar.broker.service.ServerCnx - [/x.x.x.x:59296] Closed producer before its creation was completed. producerId=220 - On broker we get following error for consumer((every ~1min until broker was restarted):
-12:29:04.579 [http-1] ERROR org.apache.pulsar.broker.service.ServerCnx - A failed consumer with id is already present on the connection. consumerId: 121, remoteAddress: /x.x.x.x:59016, subscription: s/p1 java.util.concurrent.CompletionException: java.lang.IllegalStateException: Closed consumer before creation was complete Caused by: java.lang.IllegalStateException: Closed consumer before creation was complete
Error messages
"12:29:04.579 [http-1] ERROR org.apache.pulsar.broker.service.ServerCnx - A failed consumer with id is already present on the connection. consumerId: 121, remoteAddress: /x.x.x.x:59016, subscription: s/p1
java.util.concurrent.CompletionException: java.lang.IllegalStateException: Closed consumer before creation was complete
at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:413) ~[?:?]
at java.util.concurrent.CompletableFuture.getNow(CompletableFuture.java:2134) ~[?:?]
at org.apache.pulsar.broker.service.ServerCnx.getErrorCodeWithErrorLog(ServerCnx.java:3339) ~[org.apache.pulsar-pulsar-broker-3.0.13-SNAPSHOT.jar:3.0.13-SNAPSHOT]
at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$24(ServerCnx.java:1262) ~[org.apache.pulsar-pulsar-broker-3.0.13-SNAPSHOT.jar:3.0.13-SNAPSHOT]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:614) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1163) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.lang.IllegalStateException: Closed consumer before creation was complete
at org.apache.pulsar.broker.service.ServerCnx.handleCloseConsumer(ServerCnx.java:2132) ~[org.apache.pulsar-pulsar-broker-3.0.13-SNAPSHOT.jar:3.0.13-SNAPSHOT]
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:171) ~[org.apache.pulsar-pulsar-common-3.0.13-SNAPSHOT.jar:3.0.13-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202) ~[io.netty-netty-handler-4.1.122.Final.jar:4.1.122.Final]
at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164) ~[io.netty-netty-handler-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.122.Final.jar:4.1.122.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[io.netty-netty-codec-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[io.netty-netty-handler-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) ~[io.netty-netty-transport-classes-epoll-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501) ~[io.netty-netty-transport-classes-epoll-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399) ~[io.netty-netty-transport-classes-epoll-4.1.122.Final.jar:4.1.122.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998) ~[io.netty-netty-common-4.1.122.Final.jar:4.1.122.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.122.Final.jar:4.1.122.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.122.Final.jar:4.1.122.Final]
... 1 more"
Reproducing the issue
Right now we don't know how to reproduce this issue. It has appeared 3 -4 times during broker rollout.
This happens to cluster where:
- Doing broker rollout
- Number of topics per namespace is high ~200, 300
Additional information
On producer side:
- When the producer creation gets timeout due to client side timeout, the producer instance gets removed from producer list in broker. But the future that is present to load the system topic cache is still present. So next call again creates a new producer, and also a new Future to load the system topic cache. So every time when this produce call comes from client, the producer gets created but after timeout the producer gets removed, but not the future. These futures gets accumulated and gets released when we restart the broker, with following error (for all topic which were not loaded and ~50 times each , since there were total ~50 produce call for each topic before the broker was restarted, so we got ~450 instant errors of this type when we restarted the broker):
13:20:02.306 [broker-client-shared-internal-executor-6-1] ERROR org.apache.pulsar.broker.service.BrokerService - Topic creation encountered an exception by initialize topic policies service. topic_name=persistent://t/n/t-partition-0 error_message=The consumer which subscribes the topic persistent://t/n/__change_events with subscription name reader-4f8aabbb2e was already closed when cleaning and closing the consumers org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: The consumer which subscribes the topic persistent://t/n/__change_events with subscription name reader-4f8aabbb2e was already closed when cleaning and closing the consumers at org.apache.pulsar.client.impl.ConsumerBase.failPendingReceives(ConsumerBase.java:374) ~[org.apache.pulsar-pulsar-client-original-3.0.13-SNAPSHOT.jar:3.0.13-SNAPSHOT] at org.apache.pulsar.client.impl.ConsumerBase.lambda$failPendingReceive$1(ConsumerBase.java:355) ~[org.apache.pulsar-pulsar-client-original-3.0.13-SNAPSHOT.jar:3.0.13-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.122.Final.jar:4.1.122.Final] at java.lang.Thread.run(Thread.java:840) ~[?:?]- It throws already closed exception on reader because when we restart the broker, it triggered reader.close
- This situation means that it results in memory leak, because for every producer creation call, it creates the future and all these future gets accumulated over time
On consumer side:
- The consumer creation fails with following error:
12:47:01.765 [http-0] ERROR org.apache.pulsar.broker.service.ServerCnx - A failed consumer with id is already present on the connection. consumerId: 75, remoteAddress: /x.x.x.x:58204, subscription: s/p1 java.util.concurrent.CompletionException: java.lang.IllegalStateException: Closed consumer before creation was complete - Here all consume calls fail directly, because the the broker will not remove the first consumer from consumer list until that creation fails, so no memory leak
Topic policy cache loading:
- Right now, there is not timeout in "initPolicesCache(reader, stageFuture);" in SystemTopicBasedTopicPoliciesService.java , due to which if a reader is not able to load topic policies, it can get stuck indefinitely and silently.
So we propose following changes:
We need to add timeout and retries in this call "initPolicesCache" and if it fails after certain retries, then the topic should be unloaded to a different broker, also emit metrics and logs for this failure.
Are you willing to submit a PR?
- I'm willing to submit a PR!
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug