fix: exceptions in KafkaSqlStore do not trigger liveness checking #6021 #6037
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
We have come across a case where after Registry (KafkaSQL) pod has started, h2 SQL exceptions were logged (probably because of topic corruption). The liveness check is expected to prevent the pod from running, but in this case the check was not triggered.
The initial hypothesis was that the
KafkaSqlStore
h2 DB was missing a liveness interceptor. However, I found out that the exception was intentionally ignored. Since this might hide problems, I've created a new exception type and changed the code so it is thrown.However, that change does not address the liveness part. Currently, all exceptions occurring when reading the topic are handled in KafkaSqlSink. There are two cases:
KafkaSqlRegistryStorage
)KafkaSqlSink
, I've decided to add code here that triggers the liveness check under specific circumstances (see comment).I also found out that despite liveness should be triggered in case 2 ii., that will happen only if the HTTP thread waits for the response, which was not done for import operations. I've fixed this as well.
However, because there is no guarantee that an HTTP thread will wait on the response, we actually have a slow memory leak in KafkaSqlCoordinator.
To address 3 and 4, I've added a code that would check for cases where the HTTP thread did not handle a return value or an exception, clean up, and determine if liveness check should be triggered.
I'll create a separate issue to investigate this for 3.x.