Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35749] Kafka sink component will lose data when kafka cluster is unavailable for a while #107

Merged
merged 10 commits into from
Jul 11, 2024

Conversation

JimmyZZZ
Copy link
Contributor

@JimmyZZZ JimmyZZZ commented Jul 5, 2024

What is the purpose of the change

Fix the bug for losing data during kafka cluster is unavailable for a while.
The related issue: https://issues.apache.org/jira/browse/FLINK-35749 tells how to reproduce this bug.

Brief change log

The key problem is in WriterCallback#onCompletion of KafkaWriter:

                mailboxExecutor.submit(
                      () -> {
                          // Checking for exceptions from previous writes
                          checkAsyncException();
                      },
                      "Update error metric");

'mailboxExecutor.submit' without getting future back will not throw exception, which causes the 'asyncProducerException' is assign to null but the job seems like nothing happened. The fix is using 'mailboxExecutor.execute' instead of 'mailboxExecutor.submit'.

Verifying this change

Added new test case KafkaWriterFaultToleranceITCase to cover this bug.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Copy link

boring-cyborg bot commented Jul 5, 2024

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for your contribution. The actual fix looks good but you have some additional changes that are not necessary imho.

We also need to add a test case for the fix. In your description, you said it's easy to simulate when you set retries to a low value (don't use 0) and then Kafka becomes unavailable. So please add a test to KafkaWriterITCase where you KAFKA_CONTAINER.stop() and check for the result. Don't forget to start it again in a finally block though. It might even be safer to create a new ITCase like KafkaWriterFaultToleranceITCase where you have more control over the container.

// fail over, which has been fixed in [FLINK-31305]. And using
// mailboxExecutor.execute() is better than
// mailboxExecutor.submit() since submit will swallow exceptions inside.
mailboxExecutor.execute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the actual fix. Using execute instead of submit. This bug got introduced through FLINK-31305.

Note: submit does not swallow exceptions. It's an anti-pattern to use submit without looking at the Future. The future holds the result/exception to be handled async somewhere. If we bubble up the exception the task dies without the call-site being able to properly react to it.
So please remove the second part of your comment from "And using ...".

I will amend the javadoc of the mailbox executor to make it clear that the executor indeed does not bubble up exception in submit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. My description is not exactly, if we use submit and get the future returned, the exception will also be thrown.

I'll remove this part: "And using ...".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AHeise removed now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks for fixing this!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow-up, I opened and resolved https://issues.apache.org/jira/browse/FLINK-35796.

Comment on lines 416 to 429
// In old version, asyncProducerException will be set to null here, which causes another
// bug [FLINK-35749]
// once asyncProducerException is set to null here, the MailboxExecutor thread in
// WriterCallback.onCompletion may also invoke this method checkAsyncException and get
// chance to make
// asyncProducerException be null before the function flush() and write() seeing the
// exception.
// After that, the flush and writer functions can continue to write next message batch
// to kafka, the old
// in-flight message batch with exceptions get lost.
// We hope that once some exceptions thrown during sending kafka, KafkaWriter could be
// fast-failed and
// trigger global restarting, then asyncProducerException will be initialized as null
// again.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change is necessary at all. You can see that we now have inflated error count metrics in all the tests that you adjusted (and should be reverted as well).

The issue is not that we reset the exception. It's that it doesn't bubble up (see below). We actually have a clear contract on the volatile asyncProducerException: it's set in the onCompletion and read+reset in the main thread (which is the mailbox thread). So we never have a chance to concurrently reset the exception and lose it somehow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for remind, I need some time to think over about this part

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AHeise I've remove the modification before about the reset logic in checkAsyncException(), and i understand what you mean about "we never have a chance to concurrently reset the exception" and agree with you.

Btw, I'm still little curious: if we don't reset the asyncProducerException to null in checkAsyncException(), what problems we will meet? As the comment from [FLINK-31305] : Propagate the first exception since amount of exceptions could be large. Is this the only reason that we throw the Exception for only one time? This seems not so persuasive for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep the asyncProducerException until KafkaWriter task is restarted and assign to asyncProducerException null again, that seems safer if someone modifies the related logic later...

Copy link
Contributor

@AHeise AHeise Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good point to raise if we need to reset at all or not. I'm assuming @mas-chen did it to emulate the previous behavior.

From what I can see, the only real downside is that we rethrow the exception in close (which is always called also in case of errors). That could prevent some proper cleanup and will count exceptions twice in most cases, which will inflate the metrics. I think the latter point has quite a bit of side-effects:

I'm expecting most customers to have some alerts on the exceptions metrics, which could mean that it's a behavior change that would alert more users than in previous versions. So in the worst case, some poor data engineer will be paged in the middle of the night because of that. And I would expect a few alerts to be changed to be less noisy.

All in all, I think that resetting the exception would be good IF we are all sure that there won't be any exception missed. Being sure that we can't have corrupted state is of course more important than having accurate metrics but ideally we have both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good consideration for "data engineer will be paged in the middle of the night" once the alerts on the exceptions metrics changes. Then I think we need to be very cautious for the following modification about the exception logic. And the new test KafkaWriterFaultToleranceITCase can play a role for this concerns.

@AHeise
Copy link
Contributor

AHeise commented Jul 5, 2024

Please also add some more details to the PR description and commit message in accordance with the guidlines. In particular, please use the proper Github PR template.

@JimmyZZZ
Copy link
Contributor Author

JimmyZZZ commented Jul 8, 2024

@AHeise added new test case KafkaWriterFaultToleranceITCase and do some refactor to extract some public things for KafkaWriterFaultToleranceITCase and KafkaWriterITCase, and also provided more details in this pr as Github PR template.

Pls help to review again, thanks very much.

@JimmyZZZ JimmyZZZ requested a review from AHeise July 8, 2024 17:32
Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LAGTM. Production code is good. Test code is exhaustive and still minimal. I just added a remark on the producer settings in the test. We may be able to speed up the tests significantly. PTAL.

Do you need a committer that pushes the code for you? LMK if that's the case. I still need to get back into the community to understand who is committer and who is contributor ;).

Comment on lines 38 to 42
public class KafkaWriterFaultToleranceITCase extends KafkaWriterTestBase {
private static final String INIT_KAFKA_RETRIES = "1";
private static final String INIT_KAFKA_REQUEST_TIMEOUT_MS = "2000";
private static final String INIT_KAFKA_MAX_BLOCK_MS = "3000";
private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "4000";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests look good. I'm wondering if we can/need to tweak these numbers to make the test hopefully not flaky.

We have to make good tradeoffs here:

  • The overall test shouldn't take too long and we are relying on timeouts here.
  • Timeouts shouldn't be too small to compensate for infra hiccups.
  • Retries should be non-zero.

On first glance, the values all look good in that regard.

However, now at a second thought: do we ever want to have a successful request in this test? Maybe we could set really low timeouts and retry = 0 and provoke the failures even quicker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests look good. I'm wondering if we can/need to tweak these numbers to make the test hopefully not flaky.

We have to make good tradeoffs here:

  • The overall test shouldn't take too long and we are relying on timeouts here.
  • Timeouts shouldn't be too small to compensate for infra hiccups.
  • Retries should be non-zero.

On first glance, the values all look good in that regard.

However, now at a second thought: do we ever want to have a successful request in this test? Maybe we could set really low timeouts and retry = 0 and provoke the failures even quicker?

Yes, the purpose is to make timeout exception, retry=0 or 1 has no difference here. Have modified retry = 0 now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we want to mock the scene of kafka unavailable, I'm worrying about the bad network of the test machine will also cause timeout too, so I keep other timeout thresholds at least 1 second here

import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;

/** Test base for KafkaWriter. */
public abstract class KafkaWriterTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for avoiding duplicate code.

@AHeise AHeise requested a review from mas-chen July 9, 2024 07:58
@JimmyZZZ
Copy link
Contributor Author

JimmyZZZ commented Jul 9, 2024

LAGTM. Production code is good. Test code is exhaustive and still minimal. I just added a remark on the producer settings in the test. We may be able to speed up the tests significantly. PTAL.

Do you need a committer that pushes the code for you? LMK if that's the case. I still need to get back into the community to understand who is committer and who is contributor ;).

@AHeise Yes, I need your help to reach to someone can pushes the code since I'm also not very sure who can do it. Thanks a lot.

@AHeise
Copy link
Contributor

AHeise commented Jul 10, 2024

There is an arch unit rule violation:

2024-07-09T13:48:15.4851704Z [ERROR]   Architecture Violation [Priority: MEDIUM] - Rule 'ITCASE tests should use a MiniCluster resource or extension' was violated (1 times):
2024-07-09T13:48:15.4853792Z org.apache.flink.connector.kafka.sink.KafkaWriterFaultToleranceITCase does not satisfy: only one of the following predicates match:
2024-07-09T13:48:15.4856292Z * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension
2024-07-09T13:48:15.4859585Z * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv
2024-07-09T13:48:15.4862798Z * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension
2024-07-09T13:48:15.4864813Z * reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension
2024-07-09T13:48:15.4871237Z  or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule

Digged into it:

The rule doesn't make any sense for ITCases and should be reserved for E2E tests. Please add it to the known archunit violations and I will clean up at a later point.

@JimmyZZZ
Copy link
Contributor Author

Added KafkaWriterFaultToleranceITCase to the known archunit violations, pls take a look. @AHeise

@AHeise
Copy link
Contributor

AHeise commented Jul 10, 2024

At this point, we just try to get the build green. You can check locally by running mvn verify.

@JimmyZZZ JimmyZZZ closed this Jul 11, 2024
@JimmyZZZ JimmyZZZ reopened this Jul 11, 2024
@JimmyZZZ
Copy link
Contributor Author

At this point, we just try to get the build green. You can check locally by running mvn verify.

checked, should be green now.

@AHeise
Copy link
Contributor

AHeise commented Jul 11, 2024

At this point, we just try to get the build green. You can check locally by running mvn verify.

checked, should be green now.

Retriggering CI.

@JimmyZZZ
Copy link
Contributor Author

@AHeise It's strange that only this test failed: compile_and_test (1.17.2, 8, 11) / compile_and_test (8)

I digged into it and found two Test modified in this PR are green:
2024-07-11T06:44:35.4459342Z [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 37.521 s - in org.apache.flink.connector.kafka.sink.KafkaWriterFaultToleranceITCase
2024-07-11T06:43:56.5827523Z [INFO] Tests run: 19, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 14.883 s - in org.apache.flink.connector.kafka.sink.KafkaWriterITCase

and seems something wrong abount the env or network because I found this log in the failed test: 2024-07-11T07:19:03.3807103Z ##[error]The action 'Compile and test' has timed out after 50 minutes.

So could you help to take a look or just trigger again

@AHeise
Copy link
Contributor

AHeise commented Jul 11, 2024

The hanging test comes from the source DynamicKafkaSourceITTest.IntegrationTests.testIdleReader. I don't see any sink usage in the test, so the failure is unrelated.

I'm rerunning the job and will merge soonish.

@AHeise AHeise merged commit 4429b78 into apache:main Jul 11, 2024
13 checks passed
Copy link

boring-cyborg bot commented Jul 11, 2024

Awesome work, congrats on your first merged pull request!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants