diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java index 1d116b8b8ec2..3322bb3b7ef4 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java @@ -68,8 +68,9 @@ public void testRunJarInApplicationMode() throws Exception { FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION); List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); + // APPLICATION mode should NOT include -sae parameter (detached mode on YARN) Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine)); } @@ -80,8 +81,9 @@ public void testRunJarInClusterMode() throws Exception { List commandLine1 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); + // Default: shutdownOnAttachedExit is null/false, should NOT include -sae Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine1)); flinkParameters.setFlinkVersion("<1.10"); @@ -89,7 +91,7 @@ public void testRunJarInClusterMode() throws Exception { FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine2)); flinkParameters.setFlinkVersion(">=1.12"); @@ -97,7 +99,7 @@ public void testRunJarInClusterMode() throws Exception { FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine3)); } @@ -106,8 +108,9 @@ public void testRunJarInLocalMode() throws Exception { FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL); List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); + // Default: shutdownOnAttachedExit is null/false, should NOT include -sae Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -p 4 -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine)); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index fe374c9d8fed..e7526cba03d6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -267,7 +267,9 @@ private static List buildRunCommandLineForOthers(TaskExecutionContext ta // If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated // abruptly // The task status will be synchronized with the cluster job status - args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae + if (Boolean.TRUE.equals(flinkParameters.getShutdownOnAttachedExit())) { + args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae + } // -s -yqu -yat -yD -D if (StringUtils.isNotEmpty(others)) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java index 61ad5d47724a..712de2c27ae5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java @@ -114,6 +114,21 @@ public class FlinkParameters extends AbstractParameters { */ private String rawScript; + /** + * Shutdown on attached exit (-sae parameter) + * + *

When enabled, Flink CLI will attempt to shutdown the cluster when the CLI + * terminates abruptly. This is only suitable for attached mode (CLUSTER/LOCAL). + * + *

For APPLICATION mode, this should typically be disabled as the job runs + * in detached mode on YARN. + * + *

Default: false (disabled for safety) + * + * @see FlinkArgsUtils#buildRunCommandLine + */ + private Boolean shutdownOnAttachedExit; + public ResourceInfo getMainJar() { return mainJar; } @@ -250,6 +265,14 @@ public void setRawScript(String rawScript) { this.rawScript = rawScript; } + public Boolean getShutdownOnAttachedExit() { + return shutdownOnAttachedExit; + } + + public void setShutdownOnAttachedExit(Boolean shutdownOnAttachedExit) { + this.shutdownOnAttachedExit = shutdownOnAttachedExit; + } + @Override public boolean checkParameters() { /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java index b53260bb87eb..4a1aea919a4c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java @@ -68,8 +68,9 @@ public void testRunJarInApplicationMode() throws Exception { FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION); List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); + // APPLICATION mode should NOT include -sae parameter (detached mode on YARN) Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine)); } @@ -80,8 +81,9 @@ public void testRunJarInClusterMode() { List commandLine1 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); + // Default: shutdownOnAttachedExit is null/false, should NOT include -sae Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine1)); flinkParameters.setFlinkVersion("<1.10"); @@ -89,7 +91,7 @@ public void testRunJarInClusterMode() { FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine2)); flinkParameters.setFlinkVersion(">=1.12"); @@ -97,7 +99,7 @@ public void testRunJarInClusterMode() { FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine3)); } @@ -106,8 +108,48 @@ public void testRunJarInLocalMode() { FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL); List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); + // Default: shutdownOnAttachedExit is null/false, should NOT include -sae + Assertions.assertEquals( + "${FLINK_HOME}/bin/flink run -p 4 -c org.example.Main /opt/job.jar", + joinStringListWithSpace(commandLine)); + } + + @Test + public void testRunJarWithShutdownOnAttachedExitEnabled() { + FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER); + flinkParameters.setShutdownOnAttachedExit(true); // Explicitly enable + flinkParameters.setFlinkVersion(">=1.12"); + List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); + + // When explicitly enabled, should include -sae parameter + Assertions.assertEquals( + "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + joinStringListWithSpace(commandLine)); + } + + @Test + public void testRunJarWithShutdownOnAttachedExitDisabled() { + FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER); + flinkParameters.setShutdownOnAttachedExit(false); // Explicitly disable + flinkParameters.setFlinkVersion(">=1.12"); + List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); + + // When explicitly disabled, should NOT include -sae parameter Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar", + joinStringListWithSpace(commandLine)); + } + + @Test + public void testRunJarWithShutdownOnAttachedExitInApplicationMode() { + FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION); + flinkParameters.setShutdownOnAttachedExit(true); // Even if enabled + List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); + + // APPLICATION mode with shutdownOnAttachedExit=true should include -sae + // (User explicitly wants it, even though it may cause issues) + Assertions.assertEquals( + "${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine)); } diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 916893b02a0c..8823c205a673 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -503,6 +503,9 @@ export default { 'set the complement task thread to a reasonable value to avoid too large impact on the server.', task_manager_number: 'TaskManager Number', task_manager_number_tips: 'Please enter TaskManager number', + shutdown_on_attached_exit: 'Shutdown On Attached Exit', + shutdown_on_attached_exit_tips: + 'Attempt to shutdown cluster when CLI terminates abruptly. Only suitable for attached mode, recommended to disable for APPLICATION mode', grpc_url: 'gRPC Url', grpc_url_tips: 'Please Enter gRPC Url', grpc_url_format_tips: diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index 46fd12e348fd..2a772c629a1e 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -489,6 +489,9 @@ export default { '如果存在大量任务需要补数时,可以利用自定义并行度将补数的任务线程设置成合理的数值,避免对服务器造成过大的影响', task_manager_number: 'TaskManager数量', task_manager_number_tips: '请输入TaskManager数量', + shutdown_on_attached_exit: 'Shutdown On Attached Exit', + shutdown_on_attached_exit_tips: + '当CLI异常终止时,尝试关闭集群。仅适用于attached模式,APPLICATION模式建议关闭', grpc_url: '请求地址', grpc_url_tips: '请输入请求地址(必填)', grpc_url_format_tips: '请求地址格式不正确, gRPC仅允许host:port格式', diff --git a/dolphinscheduler-ui/src/views/projects/components/dependencies/dependencies-modal.tsx b/dolphinscheduler-ui/src/views/projects/components/dependencies/dependencies-modal.tsx index 207673a2a837..e405f720a346 100644 --- a/dolphinscheduler-ui/src/views/projects/components/dependencies/dependencies-modal.tsx +++ b/dolphinscheduler-ui/src/views/projects/components/dependencies/dependencies-modal.tsx @@ -59,7 +59,7 @@ export default defineComponent({ } const cancelToHandle = () => { - ctx.emit('update:show', showRef) + ctx.emit('update:show', showRef.value) } const renderDownstreamDependencies = () => { diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts index aaedf5d2aaa5..4a142b7aace8 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts @@ -283,6 +283,17 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] { value: model.parallelism }, useYarnQueue(), + { + type: 'switch', + field: 'shutdownOnAttachedExit', + name: t('project.node.shutdown_on_attached_exit'), + span: 24, + props: { + 'checked-value': true, + 'unchecked-value': false + }, + value: false + }, { type: 'input', field: 'mainArgs',