Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@ dolphinscheduler-master/logs
dolphinscheduler-api/logs
__pycache__
ds_schema_check_test
.claude
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unnessnary change.

.zcf/
.spec-workflow/
.serena/
**/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ public void testRunJarInApplicationMode() throws Exception {
FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// APPLICATION mode should NOT include -sae parameter (detached mode on YARN)
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

Expand All @@ -80,24 +81,25 @@ public void testRunJarInClusterMode() throws Exception {
List<String> commandLine1 =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// Default: shutdownOnAttachedExit is null/false, should NOT include -sae
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));

flinkParameters.setFlinkVersion("<1.10");
List<String> commandLine2 =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine2));

flinkParameters.setFlinkVersion(">=1.12");
List<String> commandLine3 =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine3));
}

Expand All @@ -106,8 +108,9 @@ public void testRunJarInLocalMode() throws Exception {
FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// Default: shutdownOnAttachedExit is null/false, should NOT include -sae
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,10 @@ private static List<String> buildRunCommandLineForOthers(TaskExecutionContext ta
// If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated
// abruptly
// The task status will be synchronized with the cluster job status
args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
// Note: This parameter is now configurable via UI, default is false (disabled)
if (Boolean.TRUE.equals(flinkParameters.getShutdownOnAttachedExit())) {
args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
}

// -s -yqu -yat -yD -D
if (StringUtils.isNotEmpty(others)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ public class FlinkParameters extends AbstractParameters {
*/
private String rawScript;

/**
* Shutdown on attached exit (-sae parameter)
*
* <p>When enabled, Flink CLI will attempt to shutdown the cluster when the CLI
* terminates abruptly. This is only suitable for attached mode (CLUSTER/LOCAL).
*
* <p>For APPLICATION mode, this should typically be disabled as the job runs
* in detached mode on YARN.
*
* <p>Default: false (disabled for safety)
*
* @see FlinkArgsUtils#buildRunCommandLine
*/
private Boolean shutdownOnAttachedExit;

public ResourceInfo getMainJar() {
return mainJar;
}
Expand Down Expand Up @@ -250,6 +265,14 @@ public void setRawScript(String rawScript) {
this.rawScript = rawScript;
}

public Boolean getShutdownOnAttachedExit() {
return shutdownOnAttachedExit;
}

public void setShutdownOnAttachedExit(Boolean shutdownOnAttachedExit) {
this.shutdownOnAttachedExit = shutdownOnAttachedExit;
}

@Override
public boolean checkParameters() {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ public void testRunJarInApplicationMode() throws Exception {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// APPLICATION mode should NOT include -sae parameter (detached mode on YARN)
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

Expand All @@ -80,24 +81,25 @@ public void testRunJarInClusterMode() {
List<String> commandLine1 =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// Default: shutdownOnAttachedExit is null/false, should NOT include -sae
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));

flinkParameters.setFlinkVersion("<1.10");
List<String> commandLine2 =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine2));

flinkParameters.setFlinkVersion(">=1.12");
List<String> commandLine3 =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine3));
}

Expand All @@ -106,8 +108,48 @@ public void testRunJarInLocalMode() {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// Default: shutdownOnAttachedExit is null/false, should NOT include -sae
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

@Test
public void testRunJarWithShutdownOnAttachedExitEnabled() {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
flinkParameters.setShutdownOnAttachedExit(true); // Explicitly enable
flinkParameters.setFlinkVersion(">=1.12");
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// When explicitly enabled, should include -sae parameter
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

@Test
public void testRunJarWithShutdownOnAttachedExitDisabled() {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
flinkParameters.setShutdownOnAttachedExit(false); // Explicitly disable
flinkParameters.setFlinkVersion(">=1.12");
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// When explicitly disabled, should NOT include -sae parameter
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

@Test
public void testRunJarWithShutdownOnAttachedExitInApplicationMode() {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
flinkParameters.setShutdownOnAttachedExit(true); // Even if enabled
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

// APPLICATION mode with shutdownOnAttachedExit=true should include -sae
// (User explicitly wants it, even though it may cause issues)
Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

Expand Down
3 changes: 3 additions & 0 deletions dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,9 @@ export default {
'set the complement task thread to a reasonable value to avoid too large impact on the server.',
task_manager_number: 'TaskManager Number',
task_manager_number_tips: 'Please enter TaskManager number',
shutdown_on_attached_exit: 'Shutdown On Attached Exit',
shutdown_on_attached_exit_tips:
'Attempt to shutdown cluster when CLI terminates abruptly. Only suitable for attached mode, recommended to disable for APPLICATION mode',
grpc_url: 'gRPC Url',
grpc_url_tips: 'Please Enter gRPC Url',
grpc_url_format_tips:
Expand Down
3 changes: 3 additions & 0 deletions dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ export default {
'如果存在大量任务需要补数时,可以利用自定义并行度将补数的任务线程设置成合理的数值,避免对服务器造成过大的影响',
task_manager_number: 'TaskManager数量',
task_manager_number_tips: '请输入TaskManager数量',
shutdown_on_attached_exit: 'Shutdown On Attached Exit',
shutdown_on_attached_exit_tips:
'当CLI异常终止时,尝试关闭集群。仅适用于attached模式,APPLICATION模式建议关闭',
grpc_url: '请求地址',
grpc_url_tips: '请输入请求地址(必填)',
grpc_url_format_tips: '请求地址格式不正确, gRPC仅允许host:port格式',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,17 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
value: model.parallelism
},
useYarnQueue(),
{
type: 'switch',
field: 'shutdownOnAttachedExit',
name: t('project.node.shutdown_on_attached_exit'),
span: 24,
props: {
'checked-value': true,
'unchecked-value': false
},
value: false // Default: disabled
},
{
type: 'input',
field: 'mainArgs',
Expand Down
Loading