Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35228][Connectors/Kafka] Fix: DynamicKafkaSource does not read re-added topic for the same cluster #97

Merged
merged 1 commit into from
Apr 29, 2024

Conversation

IgnasD
Copy link
Contributor

@IgnasD IgnasD commented Apr 24, 2024

Saw some dormant code that was collecting activeTopicPartitions which got used nowhere. I believe the cleanup was already intended by the original developer, but just got lost for some reason.

So I've finished up the cleanup logic and added a test case to cover the issue in question.

Copy link

boring-cyborg bot commented Apr 24, 2024

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@IgnasD IgnasD changed the title [FLINK-35138][Connectors/Kafka] Fix DynamicKafkaSourceEnumerator removed topics state cleanup [FLINK-35228][Connectors/Kafka] Fix DynamicKafkaSourceEnumerator removed topics state cleanup Apr 24, 2024
@IgnasD
Copy link
Contributor Author

IgnasD commented Apr 24, 2024

Sorry, I forgot to change issue number while copying from another PR as a template. Fixed now.

@MartijnVisser MartijnVisser requested a review from mas-chen April 24, 2024 21:14
Copy link
Contributor

@mas-chen mas-chen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution! The issue you are seeing and integration test make sense to me but I don't understand how you have fixed it. The changes to DynamicKafkaSourceEnumerator looks like solely refactoring. Did you forget to commit the fix?

@IgnasD
Copy link
Contributor Author

IgnasD commented Apr 25, 2024

The fix is that previously activeTopicPartitions were being constructed but used nowhere. Now I'm constructing KafkaSourceEnumState and passing activeTopicPartitions to it.

And yes, while doing that some refactoring came in naturally to avoid code duplication.

@mas-chen
Copy link
Contributor

Thanks for explaining, I missed that! Rerunning CI, the PR LGTM

Copy link
Contributor

@mas-chen mas-chen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left one comment. Can you also fixup the PR title? e.g. your jira ticket title is a bit more informative: "DynamicKafkaSource does not read re-added topic for the same cluster"

@IgnasD IgnasD changed the title [FLINK-35228][Connectors/Kafka] Fix DynamicKafkaSourceEnumerator removed topics state cleanup [FLINK-35228][Connectors/Kafka] Fix: DynamicKafkaSource does not read re-added topic for the same cluster Apr 26, 2024
@IgnasD
Copy link
Contributor Author

IgnasD commented Apr 26, 2024

Changed the title as requested. Also, I've added filtering to unassignedInitialPartitions as suggested and covered it with a test case.

@mas-chen
Copy link
Contributor

Rerunning CI--seems to be flaky behavior in IT tests:

2024-04-27T19:26:47.5915234Z 19:26:47,500 [    Flat Map (1/1)#0] WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Flat Map (1/1)#0 (ae4b22de7586eaf3e02a3d7930e9290c_0a448493b4782967b150582570326227_0_0) switched from RUNNING to FAILED with failure cause:
2024-04-27T19:26:47.5917050Z org.apache.flink.test.util.SuccessException: null
2024-04-27T19:26:47.5918591Z 	at org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase$10.flatMap(KafkaConsumerTestBase.java:1331) ~[test-classes/:?]
2024-04-27T19:26:47.5973189Z 	at org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase$10.flatMap(KafkaConsumerTestBase.java:1302) ~[test-classes/:?]
2024-04-27T19:26:47.5975836Z 	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.5978671Z 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.5982139Z 	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.5985635Z 	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.5988935Z 	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.5991838Z 	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.5994539Z 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.5997203Z 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.5999551Z 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6001791Z 	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-runtime-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6003944Z 	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) [flink-runtime-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6005833Z 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) [flink-runtime-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6007601Z 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-runtime-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6008802Z 	at java.lang.Thread.run(Thread.java:840) [?:?]
2024-04-27T19:26:47.6011187Z 19:26:47,500 [    Flat Map (1/1)#0] INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Flat Map (1/1)#0 (ae4b22de7586eaf3e02a3d7930e9290c_0a448493b4782967b150582570326227_0_0).
2024-04-27T19:26:47.6015163Z 19:26:47,501 [flink-pekko.actor.default-dispatcher-9] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Flat Map (1/1)#0 ae4b22de7586eaf3e02a3d7930e9290c_0a448493b4782967b150582570326227_0_0.
2024-04-27T19:26:47.6020124Z 19:26:47,501 [flink-pekko.actor.default-dispatcher-9] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Flat Map (1/1) (ae4b22de7586eaf3e02a3d7930e9290c_0a448493b4782967b150582570326227_0_0) switched from RUNNING to FAILED on b7795d57-330f-46e5-80b4-9fed34414ad2 @ localhost (dataPort=-1).
2024-04-27T19:26:47.6022736Z org.apache.flink.test.util.SuccessException: null
2024-04-27T19:26:47.6024360Z 	at org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase$10.flatMap(KafkaConsumerTestBase.java:1331) ~[test-classes/:?]
2024-04-27T19:26:47.6026692Z 	at org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase$10.flatMap(KafkaConsumerTestBase.java:1302) ~[test-classes/:?]
2024-04-27T19:26:47.6096328Z 	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6101993Z 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6105266Z 	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6108323Z 	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6111209Z 	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6113917Z 	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6116391Z 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6118929Z 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6121163Z 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-streaming-java-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6123314Z 	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-runtime-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6125337Z 	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) ~[flink-runtime-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6127149Z 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[flink-runtime-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6129175Z 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-runtime-1.18.1.jar:1.18.1]
2024-04-27T19:26:47.6130366Z 	at java.lang.Thread.run(Thread.java:840) ~[?:?]
2024-04-27T19:26:47.6343309Z 19:26:47,502 [flink-pekko.actor.default-dispatcher-9] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Count elements from the topics (eccfd8eb6c25a421fae454d2b87488f3) switched from state RUNNING to FAILING.
2024-04-27T19:26:47.6356058Z org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

Copy link
Contributor

@mas-chen mas-chen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the iterations! LGTM

@mas-chen mas-chen merged commit 00c9c8c into apache:main Apr 29, 2024
13 checks passed
Copy link

boring-cyborg bot commented Apr 29, 2024

Awesome work, congrats on your first merged pull request!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants