Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void testRunJarInApplicationMode() throws Exception {
FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// Default: shutdownOnAttachedExit is null (true), should include -sae
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
Expand All @@ -80,6 +81,7 @@ public void testRunJarInClusterMode() throws Exception {
List<String> commandLine1 =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// Default: shutdownOnAttachedExit is null (true), should include -sae
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));
Expand All @@ -106,11 +108,25 @@ public void testRunJarInLocalMode() throws Exception {
FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// Default: shutdownOnAttachedExit is null (true), should include -sae
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

@Test
public void testRunJarWithShutdownOnAttachedExitDisabled() throws Exception {
FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
flinkParameters.setShutdownOnAttachedExit(false); // Explicitly disable
flinkParameters.setFlinkVersion(">=1.12");
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// When explicitly disabled, should NOT include -sae parameter (rely on Flink default)
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

@Test
public void testRunSql() throws Exception {
FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,11 @@ private static List<String> buildRunCommandLineForOthers(TaskExecutionContext ta
}

// If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated
// abruptly
// abruptly. This prevents resource leakage and duplicate tasks during worker failover.
// The task status will be synchronized with the cluster job status
args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
if (!Boolean.FALSE.equals(flinkParameters.getShutdownOnAttachedExit())) {
args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
}

// -s -yqu -yat -yD -D
if (StringUtils.isNotEmpty(others)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,22 @@ public class FlinkParameters extends AbstractParameters {
*/
private String rawScript;

/**
* Shutdown on attached exit (-sae parameter)
*
* <p>When enabled (default), Flink CLI will attempt to shutdown the cluster when the CLI
* terminates abruptly. This helps prevent resource leakage and duplicate tasks
* during worker failover scenarios.
*
* <p>When disabled, no parameter is added and we rely on Flink's default behavior
* (which is shutdown-on-attached-exit=false).
*
* <p>Default: true (enabled to prevent resource leakage during failover)
*
* @see FlinkArgsUtils#buildRunCommandLine
*/
private Boolean shutdownOnAttachedExit;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we don't support take-over flink job when failover, these change might cause the flink job duplicated run on YARN?
And, it's better to make the default value to TRUE, do not break compatibility, as this is essentially a Flink bug.

Copy link
Author

@chris-fast chris-fast Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I every agree with you on the compatibility pointkeeping the default as TRUE is definitely the safest move.

And I want to clarify a few things regarding the underlying mechanism, though:

1.It's not really a "Flink bug",-sae parameter was designed to prevent resource leaks (zombie clusters), not to handle duplicate submissions.

2.Relying on -sae=true to prevent "double runs" is actually pretty unreliable. If a worker hits a hard crash (like an OOM or power outage), the CLI dies instantly and never gets a chance to send the shutdown signal to the cluster. So, the job keeps running, and a retry will still cause a duplicate.

3.The better way to handle idempotency is via YARN Application Tags (e.g., using the ProcessInstanceId) and checking if that tag exists before submitting.

I think that "idempotency check" deserves to be a future optimization feature on its own. It’s probably better to keep it out of this current PR so we don't block the merge.

Thanks a lot for the feedback—I actually learned a ton digging into this! I’d be more than happy to help contribute code for that future optimization feature, too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that "idempotency check" deserves to be a future optimization feature on its own. It’s probably better to keep it > out of this current PR so we don't block the merge.

+1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that "idempotency check" deserves to be a future optimization feature on its own. It’s probably better to keep it > out of this current PR so we don't block the merge.

+1

Totally agree. Thanks! I'll push the new code right now

Copy link
Member

@ruanwenjun ruanwenjun Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chris-fast What I’m trying to say is that we shouldn’t break compatibility here, otherwise will lead to more problems e.g failover. Regardless of how we deal with the failover issue in the future, using -sea is not a bad thing.

Adapting the upper layer just to work around a bug in a specific version of a lower-level component isn’t a sustainable approach. If different versions of the lower layer each have their own issues, does that mean the upper layer needs to keep adding special handling for all of them? That would make the system increasingly complex and fragile.

Also, I don’t quite understand the statement that “it’s not really a Flink bug.” From my perspective, the unexpected behavior originates from Flink’s side, so it’s hard to see why it wouldn’t be considered a bug there.

The most important thing is it hard to explain to users under what circumstances they should enable/disable shutdownOnAttachedExit. If I am the user, I will ask "If this parameter is turned off, will the process not exit? Under what circumstances would we want the process to stay alive instead of exiting".


public ResourceInfo getMainJar() {
return mainJar;
}
Expand Down Expand Up @@ -250,6 +266,14 @@ public void setRawScript(String rawScript) {
this.rawScript = rawScript;
}

public Boolean getShutdownOnAttachedExit() {
return shutdownOnAttachedExit;
}

public void setShutdownOnAttachedExit(Boolean shutdownOnAttachedExit) {
this.shutdownOnAttachedExit = shutdownOnAttachedExit;
}

@Override
public boolean checkParameters() {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void testRunJarInApplicationMode() throws Exception {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// Default: shutdownOnAttachedExit is null (true), should include -sae
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
Expand All @@ -80,6 +81,7 @@ public void testRunJarInClusterMode() {
List<String> commandLine1 =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// Default: shutdownOnAttachedExit is null (true), should include -sae
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));
Expand All @@ -106,11 +108,50 @@ public void testRunJarInLocalMode() {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// Default: shutdownOnAttachedExit is null (true), should include -sae
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

@Test
public void testRunJarWithShutdownOnAttachedExitEnabled() {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
flinkParameters.setShutdownOnAttachedExit(true); // Explicitly enable
flinkParameters.setFlinkVersion(">=1.12");
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// When explicitly set to true (same as default), should include -sae parameter
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

@Test
public void testRunJarWithShutdownOnAttachedExitDisabled() {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
flinkParameters.setShutdownOnAttachedExit(false); // Explicitly disable
flinkParameters.setFlinkVersion(">=1.12");
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// When explicitly disabled, should NOT include -sae parameter (rely on Flink default)
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

@Test
public void testRunJarWithShutdownOnAttachedExitInApplicationMode() {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
flinkParameters.setShutdownOnAttachedExit(true); // Even if enabled
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// APPLICATION mode: when enabled, should include -sae (same as other modes)
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

@Test
public void testRunSql() {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
Expand Down
3 changes: 3 additions & 0 deletions dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,9 @@ export default {
'set the complement task thread to a reasonable value to avoid too large impact on the server.',
task_manager_number: 'TaskManager Number',
task_manager_number_tips: 'Please enter TaskManager number',
shutdown_on_attached_exit: 'Shutdown On Attached Exit',
shutdown_on_attached_exit_tips:
'Attempt to shutdown cluster when CLI terminates abruptly. Only suitable for attached mode, recommended to disable for APPLICATION mode',
grpc_url: 'gRPC Url',
grpc_url_tips: 'Please Enter gRPC Url',
grpc_url_format_tips:
Expand Down
3 changes: 3 additions & 0 deletions dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ export default {
'如果存在大量任务需要补数时,可以利用自定义并行度将补数的任务线程设置成合理的数值,避免对服务器造成过大的影响',
task_manager_number: 'TaskManager数量',
task_manager_number_tips: '请输入TaskManager数量',
shutdown_on_attached_exit: 'Shutdown On Attached Exit',
shutdown_on_attached_exit_tips:
'当CLI异常终止时,尝试关闭集群。仅适用于attached模式,APPLICATION模式建议关闭',
grpc_url: '请求地址',
grpc_url_tips: '请输入请求地址(必填)',
grpc_url_format_tips: '请求地址格式不正确, gRPC仅允许host:port格式',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export default defineComponent({
}

const cancelToHandle = () => {
ctx.emit('update:show', showRef)
ctx.emit('update:show', showRef.value)
}

const renderDownstreamDependencies = () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,17 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
value: model.parallelism
},
useYarnQueue(),
{
type: 'switch',
field: 'shutdownOnAttachedExit',
name: t('project.node.shutdown_on_attached_exit'),
span: 24,
props: {
'checked-value': true,
'unchecked-value': false
},
value: true
},
{
type: 'input',
field: 'mainArgs',
Expand Down
Loading