From 6131e7ef6ac5bedef5f72dafb3f9ec1c4f30c876 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Sun, 1 Feb 2026 20:23:47 +0800 Subject: [PATCH] Support configurable maximum runtime for workflow/task instance --- deploy/kubernetes/dolphinscheduler/README.md | 2 + .../kubernetes/dolphinscheduler/values.yaml | 4 + docs/docs/en/architecture/configuration.md | 38 ++++---- docs/docs/zh/architecture/configuration.md | 34 +++---- .../server/master/config/MasterConfig.java | 1 + .../MasterServerLoadProtectionConfig.java | 23 +++++ .../event/TaskTimeoutLifecycleEvent.java | 20 ++-- .../TaskStartLifecycleEventHandler.java | 19 +++- .../TaskTimeoutLifecycleEventHandler.java | 4 +- .../src/main/resources/application.yaml | 4 + .../cases/WorkflowStartTestCase.java | 31 +++++++ .../src/test/resources/application.yaml | 4 + ...orkflow_with_system_timeout_kill_task.yaml | 92 +++++++++++++++++++ 13 files changed, 228 insertions(+), 48 deletions(-) create mode 100644 dolphinscheduler-master/src/test/resources/it/start/workflow_with_system_timeout_kill_task.yaml diff --git a/deploy/kubernetes/dolphinscheduler/README.md b/deploy/kubernetes/dolphinscheduler/README.md index 0ca280604450..bfa4da6b5021 100644 --- a/deploy/kubernetes/dolphinscheduler/README.md +++ b/deploy/kubernetes/dolphinscheduler/README.md @@ -215,6 +215,8 @@ Please refer to the [Quick Start in Kubernetes](../../../docs/docs/en/guide/inst | master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_JVM_CPU_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Master max jvm cpu usage, when the master's jvm cpu usage is smaller then this value, master server can execute workflow. | | master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_CPU_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Master max system cpu usage, when the master's system cpu usage is smaller then this value, master server can execute workflow. | | master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. | +| master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_TASK_INSTANCE_RUNTIME | string | `"0d"` | Maximum allowed running time for a task instance. If the running duration exceeds this value, the instance will be killed. The default value of 0d indicates no limit. | +| master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_WORKFLOW_INSTANCE_RUNTIME | string | `"0d"` | Maximum allowed running time for a workflow instance. If the running duration exceeds this value, the instance will be killed. The default value of 0d indicates no limit. | | master.env.MASTER_STATE_WHEEL_INTERVAL | string | `"5s"` | master state wheel interval, the unit is second | | master.env.MASTER_TASK_COMMIT_INTERVAL | string | `"1s"` | master commit task interval, the unit is second | | master.env.MASTER_TASK_COMMIT_RETRYTIMES | string | `"5"` | Master commit task retry times | diff --git a/deploy/kubernetes/dolphinscheduler/values.yaml b/deploy/kubernetes/dolphinscheduler/values.yaml index 3beefea212a1..c9d545896c39 100644 --- a/deploy/kubernetes/dolphinscheduler/values.yaml +++ b/deploy/kubernetes/dolphinscheduler/values.yaml @@ -565,6 +565,10 @@ master: MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7 # -- Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. MASTER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS: 0.7 + # -- Maximum allowed running time for a workflow instance. If the running duration exceeds this value, the instance will be killed. The default value of 0d indicates no limit. + MASTER_SERVER_LOAD_PROTECTION_MAX_WORKFLOW_INSTANCE_RUNTIME: 0d + # -- Maximum allowed running time for a task instance. If the running duration exceeds this value, the instance will be killed. The default value of 0d indicates no limit. + MASTER_SERVER_LOAD_PROTECTION_MAX_TASK_INSTANCE_RUNTIME: 0d # -- Master failover interval, the unit is minute MASTER_FAILOVER_INTERVAL: "10m" # -- Master kill application when handle failover diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md index 27c289bcecc2..a45048115dc8 100644 --- a/docs/docs/en/architecture/configuration.md +++ b/docs/docs/en/architecture/configuration.md @@ -275,24 +275,26 @@ Location: `api-server/conf/application.yaml` Location: `master-server/conf/application.yaml` -| Parameters | Default value | Description | -|-----------------------------------------------------------------------------|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------| -| master.listen-port | 5678 | master listen port | -| master.logic-task-config.task-executor-thread-count | 2 * CPU +1 | The thread size used to execute logic task | -| master.worker-load-balancer-configuration-properties.type | DYNAMIC_WEIGHTED_ROUND_ROBIN | Master will use the worker's cpu/memory/threadPool usage to calculate the worker load, the lower load will have more change to be dispatched task | -| master.max-heartbeat-interval | 10s | master max heartbeat interval | -| master.server-load-protection.enabled | true | If set true, will open master overload protection | -| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.8 | Master max system cpu usage, when the master's system cpu usage is smaller then this value, master server can execute workflow. | -| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.8 | Master max JVM cpu usage, when the master's jvm cpu usage is smaller then this value, master server can execute workflow. | -| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.8 | Master max system memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. | -| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.8 | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. | -| master.server-load-protection.max-concurrent-workflow-instances | 2147483647 | Master max concurrent workflow instances, when the master's workflow instance count reaches or exceeds this value, master server will be marked as busy. | -| master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory | -| master.command-fetch-strategy.type | ID_SLOT_BASED | The command fetch strategy, only support `ID_SLOT_BASED` | -| master.command-fetch-strategy.config.id-step | 1 | The id auto incremental step of t_ds_command in db | -| master.command-fetch-strategy.config.fetch-size | 10 | The number of commands fetched by master | -| master.task-dispatch-policy.dispatch-timeout-enabled | false | Indicates whether the dispatch timeout checking mechanism is enabled | -| master.task-dispatch-policy.max-task-dispatch-duration | 1h | The maximum allowed duration a task may wait in the dispatch queue before being assigned to a worker | +| Parameters | Default value | Description | +|-----------------------------------------------------------------------------|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| master.listen-port | 5678 | master listen port | +| master.logic-task-config.task-executor-thread-count | 2 * CPU +1 | The thread size used to execute logic task | +| master.worker-load-balancer-configuration-properties.type | DYNAMIC_WEIGHTED_ROUND_ROBIN | Master will use the worker's cpu/memory/threadPool usage to calculate the worker load, the lower load will have more change to be dispatched task | +| master.max-heartbeat-interval | 10s | master max heartbeat interval | +| master.server-load-protection.enabled | true | If set true, will open master overload protection | +| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.8 | Master max system cpu usage, when the master's system cpu usage is smaller then this value, master server can execute workflow. | +| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.8 | Master max JVM cpu usage, when the master's jvm cpu usage is smaller then this value, master server can execute workflow. | +| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.8 | Master max system memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. | +| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.8 | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. | +| master.server-load-protection.max-concurrent-workflow-instances | 2147483647 | Master max concurrent workflow instances, when the master's workflow instance count reaches or exceeds this value, master server will be marked as busy. | +| master.server-load-protection.max-workflow-instance-runtime | 0m | Maximum allowed running time for a workflow instance. If the running duration exceeds this value, the instance will be kill. The default value of 0d indicates no limit, the min value is 1m. | +| master.server-load-protection.max-task-instance-runtime | 0m | Maximum allowed running time for a task instance. If the running duration exceeds this value, the instance will be kill. The default value of 0d indicates no limit, the min value is 1m. | +| master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory | +| master.command-fetch-strategy.type | ID_SLOT_BASED | The command fetch strategy, only support `ID_SLOT_BASED` | +| master.command-fetch-strategy.config.id-step | 1 | The id auto incremental step of t_ds_command in db | +| master.command-fetch-strategy.config.fetch-size | 10 | The number of commands fetched by master | +| master.task-dispatch-policy.dispatch-timeout-enabled | false | Indicates whether the dispatch timeout checking mechanism is enabled | +| master.task-dispatch-policy.max-task-dispatch-duration | 1h | The maximum allowed duration a task may wait in the dispatch queue before being assigned to a worker | ### Worker Server related configuration diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md index 80fed042f1aa..cc1e901ca9a2 100644 --- a/docs/docs/zh/architecture/configuration.md +++ b/docs/docs/zh/architecture/configuration.md @@ -305,22 +305,24 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId 位置:`worker-server/conf/application.yaml` -| 参数 | 默认值 | 描述 | -|-----------------------------------------------------------------------------|-----------|-----------------------------------------------------------------------------------------| -| worker.listen-port | 1234 | worker监听端口 | -| worker.max-heartbeat-interval | 10s | worker最大心跳间隔 | -| worker.host-weight | 100 | 派发任务时,worker主机的权重 | -| worker.tenant-auto-create | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 | -| worker.server-load-protection.enabled | true | 是否开启系统保护策略 | -| worker.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.8 | worker最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统CPU | -| worker.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.8 | worker最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的JVM CPU | -| worker.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.8 | worker最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统内存 | -| worker.server-load-protection.max-disk-usage-percentage-thresholds | 0.8 | worker最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统磁盘空间 | -| worker.alert-listen-host | localhost | alert监听host | -| worker.alert-listen-port | 50052 | alert监听端口 | -| worker.physical-task-config.task-executor-thread-size | 100 | Worker中任务最大并发度 | -| worker.tenant-config.auto-create-tenant-enabled | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 | -| worker.tenant-config.default-tenant-enabled | false | 如果设置为true, 将会使用worker服务启动用户作为 `default` 租户。 | +| 默认值 | 参数 | 描述 | +|-----------|-----------------------------------------------------------------------------|-----------------------------------------------------------------------------------------| +| 1234 | worker.listen-port | worker监听端口 | +| 10s | worker.max-heartbeat-interval | worker最大心跳间隔 | +| 100 | worker.host-weight | 派发任务时,worker主机的权重 | +| true | worker.tenant-auto-create | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 | +| true | worker.server-load-protection.enabled | 是否开启系统保护策略 | +| 0.8 | worker.server-load-protection.max-system-cpu-usage-percentage-thresholds | worker最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统CPU | +| 0.8 | worker.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | worker最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的JVM CPU | +| 0.8 | worker.server-load-protection.max-system-memory-usage-percentage-thresholds | worker最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统内存 | +| 0.8 | worker.server-load-protection.max-disk-usage-percentage-thresholds | worker最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统磁盘空间 | +| 0m | master.server-load-protection.max-workflow-instance-runtime | 一个工作流实例最大的运行时间,如果超过这个时间,实例会被kill。 默认值为 0d 表示没有限制, 最小值为1分钟。 | +| 0m | master.server-load-protection.max-task-instance-runtime | 一个任务实例最大的运行时间,如果超过这个时间,实例会被kill。 默认值为 0d 表示没有限制, 最小值为1分钟。 | +| localhost | worker.alert-listen-host | alert监听host | +| 50052 | worker.alert-listen-port | alert监听端口 | +| 100 | worker.physical-task-config.task-executor-thread-size | Worker中任务最大并发度 | +| true | worker.tenant-config.auto-create-tenant-enabled | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 | +| false | worker.tenant-config.default-tenant-enabled | 如果设置为true, 将会使用worker服务启动用户作为 `default` 租户。 | ## Alert Server相关配置 diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index b486b38092fa..4d2ecc37138d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -114,6 +114,7 @@ public void validate(Object target, Errors errors) { if (StringUtils.isEmpty(masterConfig.getMasterAddress())) { masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort())); } + serverLoadProtection.validate(errors); commandFetchStrategy.validate(errors); workerLoadBalancerConfigurationProperties.validate(errors); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java index c1cda12ce072..0a11b8bfb84a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java @@ -19,13 +19,36 @@ import org.apache.dolphinscheduler.meter.metrics.BaseServerLoadProtectionConfig; +import java.time.Duration; + import lombok.Data; import lombok.EqualsAndHashCode; +import org.springframework.validation.Errors; + @Data @EqualsAndHashCode(callSuper = true) public class MasterServerLoadProtectionConfig extends BaseServerLoadProtectionConfig { private int maxConcurrentWorkflowInstances = Integer.MAX_VALUE; + private Duration maxWorkflowInstanceRuntime = Duration.ofDays(0); + + private Duration maxTaskInstanceRuntime = Duration.ofDays(0); + + public void validate(Errors errors) { + if (maxConcurrentWorkflowInstances <= 0) { + errors.rejectValue("maxConcurrentWorkflowInstances", null, + "maxConcurrentWorkflowInstances must be greater than 0"); + } + if (!maxWorkflowInstanceRuntime.isZero() && + maxWorkflowInstanceRuntime.compareTo(Duration.ofMinutes(1)) < 0) { + errors.rejectValue("maxWorkflowInstanceRuntime", null, + "maxWorkflowInstanceRuntime must be 0 (disabled) or >= 1m"); + } + if (!maxTaskInstanceRuntime.isZero() && + maxTaskInstanceRuntime.compareTo(Duration.ofMinutes(1)) < 0) { + errors.rejectValue("maxTaskInstanceRuntime", null, "maxTaskInstanceRuntime must be 0 (disabled) or >= 1m"); + } + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java index d6c7348b7f89..5ce6d109a62e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java @@ -19,8 +19,8 @@ import static com.google.common.base.Preconditions.checkState; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType; @@ -35,23 +35,27 @@ public class TaskTimeoutLifecycleEvent extends AbstractTaskLifecycleEvent { private final ITaskExecutionRunnable taskExecutionRunnable; + private final TaskTimeoutStrategy timeoutStrategy; + protected TaskTimeoutLifecycleEvent(final ITaskExecutionRunnable taskExecutionRunnable, + final TaskTimeoutStrategy timeoutStrategy, final long timeout) { super(timeout); + this.timeoutStrategy = timeoutStrategy; this.taskExecutionRunnable = taskExecutionRunnable; } - public static TaskTimeoutLifecycleEvent of(final ITaskExecutionRunnable taskExecutionRunnable) { - final TaskDefinition taskDefinition = taskExecutionRunnable.getTaskDefinition(); + public static TaskTimeoutLifecycleEvent of(final ITaskExecutionRunnable taskExecutionRunnable, + final TaskTimeoutStrategy timeoutStrategy, + final long timeoutInMinutes) { final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); - checkState(taskDefinition != null, "The task instance must be initialized before retrying."); - final int timeout = taskDefinition.getTimeout(); - checkState(timeout >= 0, "The task timeout: %s must >=0 minutes", timeout); + checkState(timeoutStrategy != null, "The task timeoutStrategy must not be null"); + checkState(timeoutInMinutes >= 0, "The task timeout: %s must >=0 minutes", timeoutInMinutes); long delayTime = System.currentTimeMillis() - taskInstance.getSubmitTime().getTime() - + TimeUnit.MINUTES.toMillis(timeout); - return new TaskTimeoutLifecycleEvent(taskExecutionRunnable, delayTime); + + TimeUnit.MINUTES.toMillis(timeoutInMinutes); + return new TaskTimeoutLifecycleEvent(taskExecutionRunnable, timeoutStrategy, delayTime); } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskStartLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskStartLifecycleEventHandler.java index fc86fe74b93d..5173df3bfadb 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskStartLifecycleEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskStartLifecycleEventHandler.java @@ -18,6 +18,8 @@ package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.handler; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType; import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskStartLifecycleEvent; @@ -28,12 +30,16 @@ import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class TaskStartLifecycleEventHandler extends AbstractTaskLifecycleEventHandler { + @Autowired + private MasterConfig masterConfig; + @Override public void handle(final IWorkflowExecutionRunnable workflowExecutionRunnable, final TaskStartLifecycleEvent taskStartLifecycleEvent) { @@ -63,13 +69,20 @@ public ILifecycleEventType matchEventType() { private void taskTimeoutMonitor(final ITaskExecutionRunnable taskExecutionRunnable) { final TaskDefinition taskDefinition = taskExecutionRunnable.getTaskDefinition(); - if (taskDefinition.getTimeout() <= 0) { + int taskTimeout = taskDefinition.getTimeout(); + if (taskTimeout > 0 && taskDefinition.getTimeoutNotifyStrategy() != null) { log.debug("The task {} timeout {} is invalided, so the timeout monitor will not be started.", taskDefinition.getName(), taskDefinition.getTimeout()); - return; + taskExecutionRunnable.getWorkflowEventBus().publish(TaskTimeoutLifecycleEvent.of( + taskExecutionRunnable, taskDefinition.getTimeoutNotifyStrategy(), taskTimeout)); + } + + int systemTimeout = (int) masterConfig.getServerLoadProtection().getMaxTaskInstanceRuntime().toMinutes(); + if (systemTimeout > 0) { + taskExecutionRunnable.getWorkflowEventBus().publish(TaskTimeoutLifecycleEvent.of( + taskExecutionRunnable, TaskTimeoutStrategy.FAILED, systemTimeout)); } - taskExecutionRunnable.getWorkflowEventBus().publish(TaskTimeoutLifecycleEvent.of(taskExecutionRunnable)); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java index 5dfb0ec8397e..419225fa9250 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.handler; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; @@ -55,9 +54,8 @@ public void handle(final ITaskStateAction taskStateAction, // The task instance is not active, means it is already finished. return; } - final TaskDefinition taskDefinition = taskExecutionRunnable.getTaskDefinition(); final String taskName = taskExecutionRunnable.getName(); - final TaskTimeoutStrategy timeoutNotifyStrategy = taskDefinition.getTimeoutNotifyStrategy(); + final TaskTimeoutStrategy timeoutNotifyStrategy = taskTimeoutLifecycleEvent.getTimeoutStrategy(); if (timeoutNotifyStrategy == null) { log.info("The task {} TimeoutStrategy is null.", taskName); return; diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index 81c7ae3aed6f..29eff533c86f 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -110,6 +110,10 @@ master: max-disk-usage-percentage-thresholds: 0.8 # Master max concurrent workflow instances, when the master's workflow instance count exceeds this value, master server will be marked as busy. max-concurrent-workflow-instances: 2147483647 + # Maximum allowed running time for a workflow instance. If the running duration exceeds this value, the instance will be kill. The default value of 0d indicates no limit, the min value is 1m. + max-workflow-instance-runtime: 0d + # Maximum allowed running time for a task instance. If the running duration exceeds this value, the instance will be kill. The default value of 0d indicates no limit, the min value is 1m. + max-task-instance-runtime: 0d worker-group-refresh-interval: 5m # Task dispatch timeout check (currently disabled). # When enabled, tasks not dispatched within this duration are marked as failed. diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java index 6277351c295c..e9c8c2a8b2f8 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java @@ -1288,6 +1288,37 @@ public void testStartWorkflow_withTimeoutKillTask() { masterContainer.assertAllResourceReleased(); } + @Test + @DisplayName("Test start a workflow which contains a dep task will be kill by system timeout") + public void testStartWorkflow_withSystemTimeoutKillTask() { + final String yaml = "/it/start/workflow_with_system_timeout_kill_task.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition workflow = context.getWorkflow("workflow_with_timeout_kill_task"); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(workflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .build(); + workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + await() + .atMost(Duration.ofSeconds(90)) + .untilAsserted(() -> { + Assertions + .assertThat(repository.queryWorkflowInstance(workflow)) + .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState()) + .isEqualTo(WorkflowExecutionStatus.STOP)); + Assertions + .assertThat(repository.queryTaskInstance(workflow)) + .hasSize(1) + .satisfiesExactly(taskInstance -> { + assertThat(taskInstance.getName()).isEqualTo("dep_task_with_timeout_killed"); + assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL); + }); + }); + masterContainer.assertAllResourceReleased(); + } + @Test @DisplayName("Test start a workflow with task depend type TASK_ONLY") public void testStartWorkflow_withTaskOnlyStrategy() { diff --git a/dolphinscheduler-master/src/test/resources/application.yaml b/dolphinscheduler-master/src/test/resources/application.yaml index f485dd0c5fb3..cc5bf90fedfb 100644 --- a/dolphinscheduler-master/src/test/resources/application.yaml +++ b/dolphinscheduler-master/src/test/resources/application.yaml @@ -64,6 +64,10 @@ master: max-system-memory-usage-percentage-thresholds: 0.9 # Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. max-disk-usage-percentage-thresholds: 0.9 + # Maximum allowed running time for a workflow instance. If the running duration exceeds this value, the instance will be kill. The default value of 0d indicates no limit, the min value is 1m. + max-workflow-instance-runtime: 1m + # Maximum allowed running time for a task instance. If the running duration exceeds this value, the instance will be kill. The default value of 0d indicates no limit, the min value is 1m. + max-task-instance-runtime: 1m worker-load-balancer-configuration-properties: # RANDOM, ROUND_ROBIN, FIXED_WEIGHTED_ROUND_ROBIN, DYNAMIC_WEIGHTED_ROUND_ROBIN type: DYNAMIC_WEIGHTED_ROUND_ROBIN diff --git a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_system_timeout_kill_task.yaml b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_system_timeout_kill_task.yaml new file mode 100644 index 000000000000..a3c32ed997ce --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_system_timeout_kill_task.yaml @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + +workflows: + - name: workflow_with_one_fake_task_success + code: 1 + version: 1 + projectCode: 1 + description: This is a fake workflow with single task + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + - name: workflow_with_timeout_kill_task + code: 2 + version: 1 + projectCode: 1 + description: This is a fake workflow with single timeout task + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + +tasks: + - name: B + code: 1 + version: 1 + projectCode: 1 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":[],"shellScript":"if [ \"${system.project.name}\" = \"MasterIntegrationTest\" ]; then\n exit 0 \nelse\n exit 1\nfi","resourceList":[]}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + - name: dep_task_with_timeout_killed + code: 2 + version: 1 + projectCode: 1 + userId: 1 + taskType: DEPENDENT + taskParams: '{"localParams":[],"resourceList":[],"dependence":{"checkInterval":10,"failurePolicy":"DEPENDENT_FAILURE_FAILURE","relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"dependentType":"DEPENDENT_ON_WORKFLOW","projectCode":1,"definitionCode":1,"depTaskCode":0,"cycle":"day","dateValue":"last1Days"}]}]}}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 1 + workflowDefinitionCode: 1 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + - projectCode: 1 + workflowDefinitionCode: 2 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 2 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 +