-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed the problem of using the same executor to process schema change requests and flush event requests, resulting in blocking timeout. #3858
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @linjianchang's quick fix! Just left some trivial comments.
@@ -96,6 +97,9 @@ public abstract class SchemaRegistry implements OperatorCoordinator, Coordinatio | |||
protected transient SchemaManager schemaManager; | |||
protected transient TableIdRouter router; | |||
|
|||
/** Executor service to execute handle event from operator. */ | |||
private final ExecutorService runInEventFromOperatorExecutor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep its name consistent with regular schema coordinator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already modified
@@ -253,6 +260,7 @@ public final CompletableFuture<CoordinationResponse> handleCoordinationRequest( | |||
public final void handleEventFromOperator( | |||
int subTaskId, int attemptNumber, OperatorEvent event) { | |||
runInEventLoop( | |||
runInEventFromOperatorExecutor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems incorrect. handleEventFromOperator
should be submitted to the same single threaded executor like other methods (so they won't be scheduled simultaneously with other critical methods). Only the SchemaCoordinator#startSchemaChange
needs to be wrapped in another executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already modified
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC this problem will only occur if:
- Distributed schema evolution topology is created
- Flush success event takes a while to finish
- The last schema operator initiates request before any flush succeeds
Thus, The handler of FlushSuccessEvent will wait for schema evolution to finish, but the busy-loop is still waiting for collecting all flush success events.
Could you please add a test case to verify this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test case org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaEvolveTest#testLenientSchemaEvolution() has been covered this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid it's not sufficient because runtime unit tests uses ValuesDataSink
only. It takes no time to flush and might not be able to verify this.
@@ -325,6 +337,7 @@ public final void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpoi | |||
* directly, make sure you're running heavy logics inside, or the entire job might hang! | |||
*/ | |||
protected void runInEventLoop( | |||
final ExecutorService coordinatorExecutor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's nice to allow specifying ExecutorService
when
calling runInEventLoop
. Maybe regular/SchemaCoordinator
could also invoke this instead of this:
schemaChangeThreadPool.submit(
() -> {
try {
applySchemaChange(originalEvent, deducedSchemaChangeEvents);
} catch (Throwable t) {
failJob(
"Schema change applying task",
new FlinkRuntimeException(
"Failed to apply schema change event.", t));
throw t;
}
});
…or to process schema change requests and flush event requests, resulting in blocking timeout.
032e560
to
0eaf9ef
Compare
Already modified according to comment @yuxiqian |
Seems there's something wrong with internal state switching logic. Would @linjianchang like to take a further look? |
When source generate metadata events in parallel for each table,There is a problem of timeout waiting for flush to complete. Exception be like: