-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
- Broker version: 4.0.6
- Client library type: Java (Kafka Source Connector /
pulsar-io-kafka) - Deployed on Kubernetes via Function Mesh
Issue Description
When the Kafka Source connector's consumer thread encounters a fatal exception (e.g. due to a transient Pulsar authentication outage), the error notification is silently lost and the connector pod remains alive but permanently idle.
What happens:
- The consumer thread in
KafkaAbstractSource.start()catches an exception at KafkaAbstractSource.java L192-L195:} catch (Exception e) { LOG.error("Error while processing records", e); notifyError(e); break; // consumer thread exits permanently }
notifyError(e)(inAbstractPushSource) enqueues anErrorNotifierRecordonto a boundedLinkedBlockingQueue(1000)viaconsume()->queue.put().- The intent is that the instance thread dequeues it via
readNext(), which throws the exception, causing the function framework to restart the instance.
The bug: If the instance thread is blocked in sendOutputMessage() (e.g. when Pulsar client cannot contact an external authz service because of a Cloudflare outage 🤦), it never returns to call readNext(). The ErrorNotifierRecord sits in the queue (or queue.put() blocks if the queue is full), and the error is never propagated. The consumer thread is dead, the instance thread is stuck, but the pod passes liveness/readiness checks - it's alive but doing no work.
What was expected: The connector should crash (or the instance should restart) so that Kubernetes can restart the pod and recovery can happen automatically once the transient outage resolves.
Why this is a bug: The notifyError() mechanism added in PR #20791 / PIP-281 relies on the instance thread being in a state where it will call readNext(). When the instance thread is blocked on I/O (sending to Pulsar), this assumption is violated and the error signal is silently dropped. This is a liveness bug - the connector appears healthy but is permanently stalled.
Error messages
No error messages beyond the initial LOG.error("Error while processing records", e) on the consumer thread. The instance thread produces no output because it is blocked.
Reproducing the issue
- Deploy a Kafka Source connector (e.g.
KafkaBytesSource) on Kubernetes via Function Mesh - Ensure the connector is actively consuming from Kafka and producing to Pulsar
- Cause a transient Pulsar authentication failure (e.g. make the SSO/OAuth token endpoint return 500s)
- The consumer thread will eventually hit an exception when
CompletableFuture.allOf(futures).get()times out (because messages can't be sent to Pulsar), callnotifyError(), and break - The instance thread is blocked in
sendOutputMessage()trying to write a previously-consumed record to the unauthenticated Pulsar client - Observe that the pod remains running, passes health checks, but consumes no further messages from Kafka
- Even after the auth outage recovers, consumption does not resume - the consumer thread is dead
Additional information
Related issues and PRs:
- Discussion Should Kafka Source Connector itsself after unrecoverable error? #19880 - "Should Kafka Source Connector itself after unrecoverable error?"
- PR [fix][io] Close the kafka source connector if there is uncaught exception #20424 -
[fix][io] Close the kafka source connector if there is uncaught exception(merged, 3.1.0) - PR [fix][io] Close the kafka source connector got stuck #20698 -
[fix][io] Close the kafka source connector got stuck(merged, 3.1.0) - PR [fix][io] Not restart instance when kafka source poll exception. #20795 -
[fix][io] Not restart instance when kafka source poll exception(merged, 3.1.0) - this PR introduced thenotifyError()+breakpattern - PR [improve][io] Add notifyError method on PushSource. #20791 / PR [pip][design] PIP-281: Add notifyError method on PushSource #20807 (PIP-281) - Added
notifyError()toPushSource/AbstractPushSource - PR [fix][io] Kafka Source connector maybe stuck #22511 -
[fix][io] Kafka Source connector maybe stuck(merged, 3.3.0) - added timeout on futures, but doesn't address this scenario
The fixes in 3.1.0–3.3.0 addressed the case where the consumer thread itself gets stuck, but did not address the case where the consumer thread correctly signals an error via notifyError() but the instance thread is blocked and never reads it.
Possible fix directions:
- Have
notifyError()callSystem.exit(1)to force a pod restart (this is what we've done as a downstream workaround) - Have
KafkaAbstractSourcecatch the exception and callclose()on a separate thread with a timeout, thenSystem.exit(1)if close doesn't complete - Have
notifyError()throw aRuntimeExceptioninstead of (or in addition to) enqueuing - however, sincenotifyError()runs on the consumer thread, an uncaught exception would only kill that thread (which is already dying viabreak); it would not affect the stuck instance thread unless combined with aThread.UncaughtExceptionHandlerthat callsSystem.exit(1) - Add a watchdog/heartbeat mechanism to detect that the consumer thread has died
Are you willing to submit a PR?
- Not at this time.