diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java index b284a4cd0967..991b3effd76a 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java @@ -18,10 +18,16 @@ package org.apache.dolphinscheduler.dao.repository.impl; import org.apache.dolphinscheduler.common.enums.AlertStatus; +import org.apache.dolphinscheduler.common.enums.AlertType; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.BaseDaoTest; import org.apache.dolphinscheduler.dao.entity.Alert; +import org.apache.dolphinscheduler.dao.entity.ProjectUser; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import java.util.Date; import java.util.List; import org.junit.jupiter.api.Assertions; @@ -66,4 +72,49 @@ void testSendServerStoppedAlert() { .count(); Assertions.assertEquals(1L, count); } + + @Test + void testSendWorkflowTimeoutAlert() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow-timeout"); + workflowInstance.setWorkflowDefinitionCode(100L); + workflowInstance.setCommandType(CommandType.START_PROCESS); + workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); + workflowInstance.setStartTime(new Date()); + workflowInstance.setHost("localhost"); + workflowInstance.setWarningGroupId(1); + + ProjectUser projectUser = new ProjectUser(); + projectUser.setProjectCode(1L); + projectUser.setProjectName("test-project"); + projectUser.setUserName("admin"); + + alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser); + + List alerts = alertDao.listPendingAlerts(-1); + Assertions.assertNotNull(alerts); + + long timeoutAlertCount = alerts.stream() + .filter(alert -> AlertType.WORKFLOW_INSTANCE_TIMEOUT.equals(alert.getAlertType())) + .filter(alert -> alert.getWorkflowInstanceId() != null + && alert.getWorkflowInstanceId().equals(workflowInstance.getId())) + .count(); + Assertions.assertEquals(1L, timeoutAlertCount); + + Alert timeoutAlert = alerts.stream() + .filter(alert -> AlertType.WORKFLOW_INSTANCE_TIMEOUT.equals(alert.getAlertType())) + .filter(alert -> alert.getWorkflowInstanceId() != null + && alert.getWorkflowInstanceId().equals(workflowInstance.getId())) + .findFirst() + .orElse(null); + + Assertions.assertNotNull(timeoutAlert); + Assertions.assertEquals("Workflow Timeout Warn", timeoutAlert.getTitle()); + Assertions.assertEquals(projectUser.getProjectCode(), timeoutAlert.getProjectCode()); + Assertions.assertEquals(workflowInstance.getWorkflowDefinitionCode(), + timeoutAlert.getWorkflowDefinitionCode()); + Assertions.assertEquals(workflowInstance.getId(), timeoutAlert.getWorkflowInstanceId()); + Assertions.assertTrue(timeoutAlert.getContent().contains("test-workflow-timeout")); + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java index 95070d62296e..833b1f7508fd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java @@ -57,5 +57,9 @@ public enum WorkflowLifecycleEventType implements ILifecycleEventType { * Finalize the workflow instance. */ FINALIZE, + /** + * The workflow instance timeout + */ + TIMEOUT, } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java new file mode 100644 index 000000000000..23322c72b6fa --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.AbstractWorkflowLifecycleLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; + +import java.util.concurrent.TimeUnit; + +public class WorkflowTimeoutLifecycleEvent extends AbstractWorkflowLifecycleLifecycleEvent { + + private final IWorkflowExecutionRunnable workflowExecutionRunnable; + + protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable, + final long timeout) { + super(timeout); + this.workflowExecutionRunnable = workflowExecutionRunnable; + } + + @Override + public IWorkflowExecutionRunnable getWorkflowExecutionRunnable() { + return workflowExecutionRunnable; + } + + public static WorkflowTimeoutLifecycleEvent of(final IWorkflowExecutionRunnable workflowExecutionRunnable) { + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + checkState(workflowInstance != null, "The workflow instance must be initialized before timeout monitoring."); + + final int timeout = workflowInstance.getTimeout(); + checkState(timeout >= 0, "The workflow timeout: %s must >=0 minutes", timeout); + + // Calculate remaining time until timeout: timeout - elapsed time + long delayTime = TimeUnit.MINUTES.toMillis(timeout) + - (System.currentTimeMillis() - workflowInstance.getStartTime().getTime()); + return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable, delayTime); + } + + @Override + public ILifecycleEventType getEventType() { + return WorkflowLifecycleEventType.TIMEOUT; + } + + @Override + public String toString() { + return "WorkflowTimeoutLifecycleEvent{" + + "workflow=" + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getName() + + ", timeout=" + delayTime + + '}'; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java index 2a3aba95e31a..adddd4f94d38 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java @@ -17,9 +17,11 @@ package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; @@ -37,7 +39,7 @@ public class WorkflowStartLifecycleEventHandler public void handle(final IWorkflowStateAction workflowStateAction, final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowStartLifecycleEvent workflowStartEvent) { - + workflowTimeoutMonitor(workflowExecutionRunnable); workflowStateAction.onStartEvent(workflowExecutionRunnable, workflowStartEvent); } @@ -45,4 +47,17 @@ public void handle(final IWorkflowStateAction workflowStateAction, public ILifecycleEventType matchEventType() { return WorkflowLifecycleEventType.START; } + + private void workflowTimeoutMonitor(final IWorkflowExecutionRunnable workflowExecutionRunnable) { + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + if (workflowInstance.getTimeout() <= 0) { + log.debug("The workflow {} timeout {} is not configured, skip timeout monitor.", + workflowInstance.getName(), + workflowInstance.getTimeout()); + return; + } + workflowExecutionRunnable.getWorkflowEventBus() + .publish(WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable)); + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java new file mode 100644 index 000000000000..6c5845408fd4 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; + +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; +import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowTimeoutLifecycleEventHandler + extends + AbstractWorkflowLifecycleEventHandler { + + private final WorkflowAlertManager workflowAlertManager; + + public WorkflowTimeoutLifecycleEventHandler(final WorkflowAlertManager workflowAlertManager) { + this.workflowAlertManager = workflowAlertManager; + } + + @Override + public void handle(final IWorkflowStateAction workflowStateAction, + final IWorkflowExecutionRunnable workflowExecutionRunnable, + final WorkflowTimeoutLifecycleEvent workflowTimeoutEvent) { + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + final String workflowName = workflowExecutionRunnable.getName(); + + if (workflowInstance.getState().isFinalState()) { + log.info("The workflow {} has been finished with state: {}, skip timeout alert.", + workflowName, + workflowInstance.getState().name()); + return; + } + + + log.info("The workflow {} has timeout, try to send a timeout alert.", workflowName); + doWorkflowTimeoutAlert(workflowInstance); + } + + private void doWorkflowTimeoutAlert(final WorkflowInstance workflowInstance) { + workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance); + } + + @Override + public ILifecycleEventType matchEventType() { + return WorkflowLifecycleEventType.TIMEOUT; + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java index bbe3ec3b07b7..9f1f3a302e1c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java @@ -103,15 +103,7 @@ public String getContentWorkflowInstance(WorkflowInstance workflowInstance, Project project) { String res; - WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogDao - .queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(), - workflowInstance.getWorkflowDefinitionVersion()); - - String modifyBy = ""; - if (workflowDefinitionLog != null) { - User operator = userDao.queryById(workflowDefinitionLog.getOperator()); - modifyBy = operator == null ? "" : operator.getUserName(); - } + String modifyBy = queryWorkflowOperator(workflowInstance); List successTaskList = new ArrayList<>(1); WorkflowAlertContent workflowAlertContent = WorkflowAlertContent.builder() @@ -136,6 +128,19 @@ public String getContentWorkflowInstance(WorkflowInstance workflowInstance, return res; } + private String queryWorkflowOperator(WorkflowInstance workflowInstance) { + WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogDao + .queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(), + workflowInstance.getWorkflowDefinitionVersion()); + + String modifyBy = ""; + if (workflowDefinitionLog != null) { + User operator = userDao.queryById(workflowDefinitionLog.getOperator()); + modifyBy = operator == null ? "" : operator.getUserName(); + } + return modifyBy; + } + /** * getting worker fault tolerant content * @@ -147,14 +152,7 @@ private String getWorkerToleranceContent(WorkflowInstance workflowInstance, List List toleranceTaskInstanceList = new ArrayList<>(); - WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogDao - .queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(), - workflowInstance.getWorkflowDefinitionVersion()); - String modifyBy = ""; - if (workflowDefinitionLog != null) { - User operator = userDao.queryById(workflowDefinitionLog.getOperator()); - modifyBy = operator == null ? "" : operator.getUserName(); - } + String modifyBy = queryWorkflowOperator(workflowInstance); for (TaskInstance taskInstance : toleranceTaskList) { WorkflowAlertContent workflowAlertContent = WorkflowAlertContent.builder() @@ -264,4 +262,9 @@ public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser = projectDao.queryProjectWithUserByWorkflowInstanceId(workflowInstance.getId()); alertDao.sendTaskTimeoutAlert(workflowInstance, taskInstance, projectUser); } + + public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance) { + ProjectUser projectUser = projectDao.queryProjectWithUserByWorkflowInstanceId(workflowInstance.getId()); + alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser); + } }