From 0b705ad113be19a97f66aa34b3d094c23c425f4f Mon Sep 17 00:00:00 2001 From: Zzih96 Date: Wed, 24 Dec 2025 09:57:58 +0800 Subject: [PATCH 1/9] add workflow timeout --- .../apache/dolphinscheduler/dao/AlertDao.java | 7 +- .../dao/repository/impl/AlertDaoTest.java | 51 +++++++ .../lifecycle/WorkflowLifecycleEventType.java | 4 + .../event/WorkflowTimeoutLifecycleEvent.java | 70 +++++++++ .../WorkflowStartLifecycleEventHandler.java | 17 ++- .../WorkflowTimeoutLifecycleEventHandler.java | 72 ++++++++++ .../WorkflowTimeoutLifecycleEventTest.java | 135 ++++++++++++++++++ ...orkflowStartLifecycleEventHandlerTest.java | 114 +++++++++++++++ ...kflowTimeoutLifecycleEventHandlerTest.java | 128 +++++++++++++++++ .../service/alert/WorkflowAlertManager.java | 47 +++--- 10 files changed, 625 insertions(+), 20 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEventTest.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandlerTest.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java index 708c8ad6b8a0..9cc959af27f1 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java @@ -192,9 +192,10 @@ public void sendServerStoppedAlert(String host, String serverType) { * workflow time out alert * * @param workflowInstance workflowInstance - * @param projectUser projectUser + * @param projectUser projectUser + * @param modifyBy modifyBy */ - public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser) { + public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser, String modifyBy) { int alertGroupId = workflowInstance.getWarningGroupId(); Alert alert = new Alert(); List workflowAlertContentList = new ArrayList<>(1); @@ -207,6 +208,8 @@ public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectU .workflowInstanceName(workflowInstance.getName()) .commandType(workflowInstance.getCommandType()) .workflowExecutionStatus(workflowInstance.getState()) + .modifyBy(modifyBy) + .recovery(workflowInstance.getRecovery()) .runTimes(workflowInstance.getRunTimes()) .workflowStartTime(workflowInstance.getStartTime()) .workflowHost(workflowInstance.getHost()) diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java index b284a4cd0967..c8033b5b3fc8 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java @@ -18,10 +18,16 @@ package org.apache.dolphinscheduler.dao.repository.impl; import org.apache.dolphinscheduler.common.enums.AlertStatus; +import org.apache.dolphinscheduler.common.enums.AlertType; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.BaseDaoTest; import org.apache.dolphinscheduler.dao.entity.Alert; +import org.apache.dolphinscheduler.dao.entity.ProjectUser; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import java.util.Date; import java.util.List; import org.junit.jupiter.api.Assertions; @@ -66,4 +72,49 @@ void testSendServerStoppedAlert() { .count(); Assertions.assertEquals(1L, count); } + + @Test + void testSendWorkflowTimeoutAlert() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow-timeout"); + workflowInstance.setWorkflowDefinitionCode(100L); + workflowInstance.setCommandType(CommandType.START_PROCESS); + workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); + workflowInstance.setStartTime(new Date()); + workflowInstance.setHost("localhost"); + workflowInstance.setWarningGroupId(1); + + ProjectUser projectUser = new ProjectUser(); + projectUser.setProjectCode(1L); + projectUser.setProjectName("test-project"); + projectUser.setUserName("admin"); + + alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser, "admin"); + + List alerts = alertDao.listPendingAlerts(-1); + Assertions.assertNotNull(alerts); + + long timeoutAlertCount = alerts.stream() + .filter(alert -> AlertType.WORKFLOW_INSTANCE_TIMEOUT.equals(alert.getAlertType())) + .filter(alert -> alert.getWorkflowInstanceId() != null + && alert.getWorkflowInstanceId().equals(workflowInstance.getId())) + .count(); + Assertions.assertEquals(1L, timeoutAlertCount); + + Alert timeoutAlert = alerts.stream() + .filter(alert -> AlertType.WORKFLOW_INSTANCE_TIMEOUT.equals(alert.getAlertType())) + .filter(alert -> alert.getWorkflowInstanceId() != null + && alert.getWorkflowInstanceId().equals(workflowInstance.getId())) + .findFirst() + .orElse(null); + + Assertions.assertNotNull(timeoutAlert); + Assertions.assertEquals("Workflow Timeout Warn", timeoutAlert.getTitle()); + Assertions.assertEquals(projectUser.getProjectCode(), timeoutAlert.getProjectCode()); + Assertions.assertEquals(workflowInstance.getWorkflowDefinitionCode(), + timeoutAlert.getWorkflowDefinitionCode()); + Assertions.assertEquals(workflowInstance.getId(), timeoutAlert.getWorkflowInstanceId()); + Assertions.assertTrue(timeoutAlert.getContent().contains("test-workflow-timeout")); + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java index 95070d62296e..833b1f7508fd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java @@ -57,5 +57,9 @@ public enum WorkflowLifecycleEventType implements ILifecycleEventType { * Finalize the workflow instance. */ FINALIZE, + /** + * The workflow instance timeout + */ + TIMEOUT, } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java new file mode 100644 index 000000000000..ad9eb8bc0512 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.AbstractWorkflowLifecycleLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; + +import java.util.concurrent.TimeUnit; + +import lombok.Getter; + +@Getter +public class WorkflowTimeoutLifecycleEvent extends AbstractWorkflowLifecycleLifecycleEvent { + + private final IWorkflowExecutionRunnable workflowExecutionRunnable; + + protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable, + final long timeout) { + super(timeout); + this.workflowExecutionRunnable = workflowExecutionRunnable; + } + + public static WorkflowTimeoutLifecycleEvent of(final IWorkflowExecutionRunnable workflowExecutionRunnable) { + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + checkState(workflowInstance != null, "The workflow instance must be initialized before timeout monitoring."); + + final int timeout = workflowInstance.getTimeout(); + checkState(timeout >= 0, "The workflow timeout: %s must >=0 minutes", timeout); + + // Calculate remaining time until timeout: timeout - elapsed time + long delayTime = TimeUnit.MINUTES.toMillis(timeout) + - (System.currentTimeMillis() - workflowInstance.getStartTime().getTime()); + // Ensure delayTime is not negative (trigger immediately if already timeout) + delayTime = Math.max(0, delayTime); + return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable, delayTime); + } + + @Override + public ILifecycleEventType getEventType() { + return WorkflowLifecycleEventType.TIMEOUT; + } + + @Override + public String toString() { + return "WorkflowTimeoutLifecycleEvent{" + + "workflow=" + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getName() + + ", timeout=" + delayTime + + '}'; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java index 2a3aba95e31a..adddd4f94d38 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java @@ -17,9 +17,11 @@ package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; @@ -37,7 +39,7 @@ public class WorkflowStartLifecycleEventHandler public void handle(final IWorkflowStateAction workflowStateAction, final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowStartLifecycleEvent workflowStartEvent) { - + workflowTimeoutMonitor(workflowExecutionRunnable); workflowStateAction.onStartEvent(workflowExecutionRunnable, workflowStartEvent); } @@ -45,4 +47,17 @@ public void handle(final IWorkflowStateAction workflowStateAction, public ILifecycleEventType matchEventType() { return WorkflowLifecycleEventType.START; } + + private void workflowTimeoutMonitor(final IWorkflowExecutionRunnable workflowExecutionRunnable) { + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + if (workflowInstance.getTimeout() <= 0) { + log.debug("The workflow {} timeout {} is not configured, skip timeout monitor.", + workflowInstance.getName(), + workflowInstance.getTimeout()); + return; + } + workflowExecutionRunnable.getWorkflowEventBus() + .publish(WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable)); + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java new file mode 100644 index 000000000000..1b6a10600be7 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; + +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; +import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowTimeoutLifecycleEventHandler + extends + AbstractWorkflowLifecycleEventHandler { + + private final WorkflowAlertManager workflowAlertManager; + + public WorkflowTimeoutLifecycleEventHandler(final WorkflowAlertManager workflowAlertManager) { + this.workflowAlertManager = workflowAlertManager; + } + + @Override + public void handle(final IWorkflowStateAction workflowStateAction, + final IWorkflowExecutionRunnable workflowExecutionRunnable, + final WorkflowTimeoutLifecycleEvent workflowTimeoutEvent) { + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + final String workflowName = workflowExecutionRunnable.getName(); + + // Check if workflow is still active (not finished) + if (workflowInstance.getState().isFinalState()) { + log.info("The workflow {} has been finished with state: {}, skip timeout alert.", + workflowName, + workflowInstance.getState().name()); + return; + } + + log.info("The workflow {} has timeout, try to send a timeout alert.", workflowName); + doWorkflowTimeoutAlert(workflowInstance); + } + + private void doWorkflowTimeoutAlert(final WorkflowInstance workflowInstance) { + // ProjectUser will be built in WorkflowAlertManager + workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance, null); + } + + @Override + public ILifecycleEventType matchEventType() { + return WorkflowLifecycleEventType.TIMEOUT; + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEventTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEventTest.java new file mode 100644 index 000000000000..2288f51ca127 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEventTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext; + +import java.util.Date; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class WorkflowTimeoutLifecycleEventTest { + + @Test + void testCreateEventWithValidTimeout() { + IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(60); + workflowInstance.setStartTime(new Date(System.currentTimeMillis() - 30 * 60 * 1000)); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + + WorkflowTimeoutLifecycleEvent event = WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable); + + assertNotNull(event); + assertEquals(WorkflowLifecycleEventType.TIMEOUT, event.getEventType()); + assertEquals(workflowExecutionRunnable, event.getWorkflowExecutionRunnable()); + } + + @Test + void testCreateEventWithZeroTimeout() { + IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(0); + workflowInstance.setStartTime(new Date()); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + + WorkflowTimeoutLifecycleEvent event = WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable); + + assertNotNull(event); + } + + @Test + void testCreateEventWithAlreadyTimeout() { + IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(30); + workflowInstance.setStartTime(new Date(System.currentTimeMillis() - 60 * 60 * 1000)); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + + WorkflowTimeoutLifecycleEvent event = WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable); + + assertNotNull(event); + } + + @Test + void testCreateEventWithNullWorkflowInstance() { + IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(null); + + assertThrows(IllegalStateException.class, + () -> WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable)); + } + + @Test + void testCreateEventWithNegativeTimeout() { + IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(-1); + workflowInstance.setStartTime(new Date()); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + + assertThrows(IllegalStateException.class, + () -> WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable)); + } + + @Test + void testToString() { + IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(60); + workflowInstance.setStartTime(new Date()); + + WorkflowExecuteContext context = mock(WorkflowExecuteContext.class); + when(context.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getWorkflowExecuteContext()).thenReturn(context); + + WorkflowTimeoutLifecycleEvent event = WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable); + + String toString = event.toString(); + assertTrue(toString.contains("WorkflowTimeoutLifecycleEvent")); + assertTrue(toString.contains("test-workflow")); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandlerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandlerTest.java new file mode 100644 index 000000000000..2ca529a6b6af --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandlerTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; + +import java.util.Date; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class WorkflowStartLifecycleEventHandlerTest { + + @Mock + private IWorkflowStateAction workflowStateAction; + + @Mock + private IWorkflowExecutionRunnable workflowExecutionRunnable; + + @Mock + private WorkflowStartLifecycleEvent workflowStartEvent; + + @Mock + private WorkflowEventBus workflowEventBus; + + private WorkflowStartLifecycleEventHandler handler; + + @BeforeEach + void setUp() { + handler = new WorkflowStartLifecycleEventHandler(); + } + + @Test + void testMatchEventType() { + assertEquals(WorkflowLifecycleEventType.START, handler.matchEventType()); + } + + @Test + void testHandleWithTimeoutConfigured() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(60); + workflowInstance.setStartTime(new Date()); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getWorkflowEventBus()).thenReturn(workflowEventBus); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowStartEvent); + + verify(workflowEventBus).publish(any(WorkflowTimeoutLifecycleEvent.class)); + verify(workflowStateAction).onStartEvent(workflowExecutionRunnable, workflowStartEvent); + } + + @Test + void testHandleWithNoTimeout() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(0); + workflowInstance.setStartTime(new Date()); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowStartEvent); + + verify(workflowStateAction).onStartEvent(workflowExecutionRunnable, workflowStartEvent); + } + + @Test + void testHandleWithNegativeTimeout() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(-1); + workflowInstance.setStartTime(new Date()); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowStartEvent); + + verify(workflowStateAction).onStartEvent(workflowExecutionRunnable, workflowStartEvent); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java new file mode 100644 index 000000000000..ccd8b15d4dfe --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; +import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class WorkflowTimeoutLifecycleEventHandlerTest { + + @Mock + private WorkflowAlertManager workflowAlertManager; + + @Mock + private IWorkflowStateAction workflowStateAction; + + @Mock + private IWorkflowExecutionRunnable workflowExecutionRunnable; + + @Mock + private WorkflowTimeoutLifecycleEvent workflowTimeoutEvent; + + private WorkflowTimeoutLifecycleEventHandler handler; + + @BeforeEach + void setUp() { + handler = new WorkflowTimeoutLifecycleEventHandler(workflowAlertManager); + } + + @Test + void testMatchEventType() { + assertEquals(WorkflowLifecycleEventType.TIMEOUT, handler.matchEventType()); + } + + @Test + void testHandleWorkflowTimeoutWithRunningWorkflow() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); + workflowInstance.setWarningGroupId(1); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); + + verify(workflowAlertManager).sendWorkflowTimeoutAlert(eq(workflowInstance), eq(null)); + } + + @Test + void testHandleWorkflowTimeoutWithFinishedWorkflow() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setState(WorkflowExecutionStatus.SUCCESS); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); + + verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any(), any()); + } + + @Test + void testHandleWorkflowTimeoutWithFailedWorkflow() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setState(WorkflowExecutionStatus.FAILURE); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); + + verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any(), any()); + } + + @Test + void testHandleWorkflowTimeoutWithStoppedWorkflow() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setState(WorkflowExecutionStatus.STOP); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); + + verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any(), any()); + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java index bbe3ec3b07b7..ac0024d1994c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Objects; import lombok.extern.slf4j.Slf4j; @@ -103,15 +104,7 @@ public String getContentWorkflowInstance(WorkflowInstance workflowInstance, Project project) { String res; - WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogDao - .queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(), - workflowInstance.getWorkflowDefinitionVersion()); - - String modifyBy = ""; - if (workflowDefinitionLog != null) { - User operator = userDao.queryById(workflowDefinitionLog.getOperator()); - modifyBy = operator == null ? "" : operator.getUserName(); - } + String modifyBy = queryWorkflowOperator(workflowInstance); List successTaskList = new ArrayList<>(1); WorkflowAlertContent workflowAlertContent = WorkflowAlertContent.builder() @@ -136,6 +129,19 @@ public String getContentWorkflowInstance(WorkflowInstance workflowInstance, return res; } + private String queryWorkflowOperator(WorkflowInstance workflowInstance) { + WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogDao + .queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(), + workflowInstance.getWorkflowDefinitionVersion()); + + String modifyBy = ""; + if (workflowDefinitionLog != null) { + User operator = userDao.queryById(workflowDefinitionLog.getOperator()); + modifyBy = operator == null ? "" : operator.getUserName(); + } + return modifyBy; + } + /** * getting worker fault tolerant content * @@ -147,14 +153,7 @@ private String getWorkerToleranceContent(WorkflowInstance workflowInstance, List List toleranceTaskInstanceList = new ArrayList<>(); - WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogDao - .queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(), - workflowInstance.getWorkflowDefinitionVersion()); - String modifyBy = ""; - if (workflowDefinitionLog != null) { - User operator = userDao.queryById(workflowDefinitionLog.getOperator()); - modifyBy = operator == null ? "" : operator.getUserName(); - } + String modifyBy = queryWorkflowOperator(workflowInstance); for (TaskInstance taskInstance : toleranceTaskList) { WorkflowAlertContent workflowAlertContent = WorkflowAlertContent.builder() @@ -264,4 +263,18 @@ public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser = projectDao.queryProjectWithUserByWorkflowInstanceId(workflowInstance.getId()); alertDao.sendTaskTimeoutAlert(workflowInstance, taskInstance, projectUser); } + + public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser) { + // Get workflow definition log for modifyBy + String modifyBy = queryWorkflowOperator(workflowInstance); + if (Objects.isNull(projectUser)) { + Project project = projectDao.queryByCode(workflowInstance.getProjectCode()); + projectUser = new ProjectUser(); + projectUser.setProjectCode(project.getCode()); + projectUser.setProjectName(project.getName()); + projectUser.setUserName(project.getUserName()); + } + + alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser, modifyBy); + } } From 3de2674cc84c89edf750a18fde9654d79850b99c Mon Sep 17 00:00:00 2001 From: Zzih96 Date: Wed, 24 Dec 2025 10:50:03 +0800 Subject: [PATCH 2/9] add getWorkflowExecutionRunnable Override --- .../lifecycle/event/WorkflowTimeoutLifecycleEvent.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java index ad9eb8bc0512..4fa0d0303013 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java @@ -27,9 +27,6 @@ import java.util.concurrent.TimeUnit; -import lombok.Getter; - -@Getter public class WorkflowTimeoutLifecycleEvent extends AbstractWorkflowLifecycleLifecycleEvent { private final IWorkflowExecutionRunnable workflowExecutionRunnable; @@ -40,6 +37,11 @@ protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflo this.workflowExecutionRunnable = workflowExecutionRunnable; } + @Override + public IWorkflowExecutionRunnable getWorkflowExecutionRunnable() { + return workflowExecutionRunnable; + } + public static WorkflowTimeoutLifecycleEvent of(final IWorkflowExecutionRunnable workflowExecutionRunnable) { final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); checkState(workflowInstance != null, "The workflow instance must be initialized before timeout monitoring."); From 876e3bd2eea5fea403002f356a2d9774e4f31425 Mon Sep 17 00:00:00 2001 From: Zzih96 Date: Wed, 24 Dec 2025 09:57:58 +0800 Subject: [PATCH 3/9] add workflow timeout --- .../apache/dolphinscheduler/dao/AlertDao.java | 7 +- .../dao/repository/impl/AlertDaoTest.java | 51 +++++++ .../lifecycle/WorkflowLifecycleEventType.java | 4 + .../event/WorkflowTimeoutLifecycleEvent.java | 70 +++++++++ .../WorkflowStartLifecycleEventHandler.java | 17 ++- .../WorkflowTimeoutLifecycleEventHandler.java | 72 ++++++++++ .../WorkflowTimeoutLifecycleEventTest.java | 135 ++++++++++++++++++ ...orkflowStartLifecycleEventHandlerTest.java | 114 +++++++++++++++ ...kflowTimeoutLifecycleEventHandlerTest.java | 128 +++++++++++++++++ .../service/alert/WorkflowAlertManager.java | 47 +++--- 10 files changed, 625 insertions(+), 20 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEventTest.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandlerTest.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java index 708c8ad6b8a0..9cc959af27f1 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java @@ -192,9 +192,10 @@ public void sendServerStoppedAlert(String host, String serverType) { * workflow time out alert * * @param workflowInstance workflowInstance - * @param projectUser projectUser + * @param projectUser projectUser + * @param modifyBy modifyBy */ - public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser) { + public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser, String modifyBy) { int alertGroupId = workflowInstance.getWarningGroupId(); Alert alert = new Alert(); List workflowAlertContentList = new ArrayList<>(1); @@ -207,6 +208,8 @@ public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectU .workflowInstanceName(workflowInstance.getName()) .commandType(workflowInstance.getCommandType()) .workflowExecutionStatus(workflowInstance.getState()) + .modifyBy(modifyBy) + .recovery(workflowInstance.getRecovery()) .runTimes(workflowInstance.getRunTimes()) .workflowStartTime(workflowInstance.getStartTime()) .workflowHost(workflowInstance.getHost()) diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java index b284a4cd0967..c8033b5b3fc8 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java @@ -18,10 +18,16 @@ package org.apache.dolphinscheduler.dao.repository.impl; import org.apache.dolphinscheduler.common.enums.AlertStatus; +import org.apache.dolphinscheduler.common.enums.AlertType; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.BaseDaoTest; import org.apache.dolphinscheduler.dao.entity.Alert; +import org.apache.dolphinscheduler.dao.entity.ProjectUser; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import java.util.Date; import java.util.List; import org.junit.jupiter.api.Assertions; @@ -66,4 +72,49 @@ void testSendServerStoppedAlert() { .count(); Assertions.assertEquals(1L, count); } + + @Test + void testSendWorkflowTimeoutAlert() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow-timeout"); + workflowInstance.setWorkflowDefinitionCode(100L); + workflowInstance.setCommandType(CommandType.START_PROCESS); + workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); + workflowInstance.setStartTime(new Date()); + workflowInstance.setHost("localhost"); + workflowInstance.setWarningGroupId(1); + + ProjectUser projectUser = new ProjectUser(); + projectUser.setProjectCode(1L); + projectUser.setProjectName("test-project"); + projectUser.setUserName("admin"); + + alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser, "admin"); + + List alerts = alertDao.listPendingAlerts(-1); + Assertions.assertNotNull(alerts); + + long timeoutAlertCount = alerts.stream() + .filter(alert -> AlertType.WORKFLOW_INSTANCE_TIMEOUT.equals(alert.getAlertType())) + .filter(alert -> alert.getWorkflowInstanceId() != null + && alert.getWorkflowInstanceId().equals(workflowInstance.getId())) + .count(); + Assertions.assertEquals(1L, timeoutAlertCount); + + Alert timeoutAlert = alerts.stream() + .filter(alert -> AlertType.WORKFLOW_INSTANCE_TIMEOUT.equals(alert.getAlertType())) + .filter(alert -> alert.getWorkflowInstanceId() != null + && alert.getWorkflowInstanceId().equals(workflowInstance.getId())) + .findFirst() + .orElse(null); + + Assertions.assertNotNull(timeoutAlert); + Assertions.assertEquals("Workflow Timeout Warn", timeoutAlert.getTitle()); + Assertions.assertEquals(projectUser.getProjectCode(), timeoutAlert.getProjectCode()); + Assertions.assertEquals(workflowInstance.getWorkflowDefinitionCode(), + timeoutAlert.getWorkflowDefinitionCode()); + Assertions.assertEquals(workflowInstance.getId(), timeoutAlert.getWorkflowInstanceId()); + Assertions.assertTrue(timeoutAlert.getContent().contains("test-workflow-timeout")); + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java index 95070d62296e..833b1f7508fd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java @@ -57,5 +57,9 @@ public enum WorkflowLifecycleEventType implements ILifecycleEventType { * Finalize the workflow instance. */ FINALIZE, + /** + * The workflow instance timeout + */ + TIMEOUT, } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java new file mode 100644 index 000000000000..ad9eb8bc0512 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.AbstractWorkflowLifecycleLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; + +import java.util.concurrent.TimeUnit; + +import lombok.Getter; + +@Getter +public class WorkflowTimeoutLifecycleEvent extends AbstractWorkflowLifecycleLifecycleEvent { + + private final IWorkflowExecutionRunnable workflowExecutionRunnable; + + protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable, + final long timeout) { + super(timeout); + this.workflowExecutionRunnable = workflowExecutionRunnable; + } + + public static WorkflowTimeoutLifecycleEvent of(final IWorkflowExecutionRunnable workflowExecutionRunnable) { + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + checkState(workflowInstance != null, "The workflow instance must be initialized before timeout monitoring."); + + final int timeout = workflowInstance.getTimeout(); + checkState(timeout >= 0, "The workflow timeout: %s must >=0 minutes", timeout); + + // Calculate remaining time until timeout: timeout - elapsed time + long delayTime = TimeUnit.MINUTES.toMillis(timeout) + - (System.currentTimeMillis() - workflowInstance.getStartTime().getTime()); + // Ensure delayTime is not negative (trigger immediately if already timeout) + delayTime = Math.max(0, delayTime); + return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable, delayTime); + } + + @Override + public ILifecycleEventType getEventType() { + return WorkflowLifecycleEventType.TIMEOUT; + } + + @Override + public String toString() { + return "WorkflowTimeoutLifecycleEvent{" + + "workflow=" + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getName() + + ", timeout=" + delayTime + + '}'; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java index 2a3aba95e31a..adddd4f94d38 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java @@ -17,9 +17,11 @@ package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; @@ -37,7 +39,7 @@ public class WorkflowStartLifecycleEventHandler public void handle(final IWorkflowStateAction workflowStateAction, final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowStartLifecycleEvent workflowStartEvent) { - + workflowTimeoutMonitor(workflowExecutionRunnable); workflowStateAction.onStartEvent(workflowExecutionRunnable, workflowStartEvent); } @@ -45,4 +47,17 @@ public void handle(final IWorkflowStateAction workflowStateAction, public ILifecycleEventType matchEventType() { return WorkflowLifecycleEventType.START; } + + private void workflowTimeoutMonitor(final IWorkflowExecutionRunnable workflowExecutionRunnable) { + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + if (workflowInstance.getTimeout() <= 0) { + log.debug("The workflow {} timeout {} is not configured, skip timeout monitor.", + workflowInstance.getName(), + workflowInstance.getTimeout()); + return; + } + workflowExecutionRunnable.getWorkflowEventBus() + .publish(WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable)); + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java new file mode 100644 index 000000000000..1b6a10600be7 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; + +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; +import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowTimeoutLifecycleEventHandler + extends + AbstractWorkflowLifecycleEventHandler { + + private final WorkflowAlertManager workflowAlertManager; + + public WorkflowTimeoutLifecycleEventHandler(final WorkflowAlertManager workflowAlertManager) { + this.workflowAlertManager = workflowAlertManager; + } + + @Override + public void handle(final IWorkflowStateAction workflowStateAction, + final IWorkflowExecutionRunnable workflowExecutionRunnable, + final WorkflowTimeoutLifecycleEvent workflowTimeoutEvent) { + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + final String workflowName = workflowExecutionRunnable.getName(); + + // Check if workflow is still active (not finished) + if (workflowInstance.getState().isFinalState()) { + log.info("The workflow {} has been finished with state: {}, skip timeout alert.", + workflowName, + workflowInstance.getState().name()); + return; + } + + log.info("The workflow {} has timeout, try to send a timeout alert.", workflowName); + doWorkflowTimeoutAlert(workflowInstance); + } + + private void doWorkflowTimeoutAlert(final WorkflowInstance workflowInstance) { + // ProjectUser will be built in WorkflowAlertManager + workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance, null); + } + + @Override + public ILifecycleEventType matchEventType() { + return WorkflowLifecycleEventType.TIMEOUT; + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEventTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEventTest.java new file mode 100644 index 000000000000..2288f51ca127 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEventTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext; + +import java.util.Date; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class WorkflowTimeoutLifecycleEventTest { + + @Test + void testCreateEventWithValidTimeout() { + IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(60); + workflowInstance.setStartTime(new Date(System.currentTimeMillis() - 30 * 60 * 1000)); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + + WorkflowTimeoutLifecycleEvent event = WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable); + + assertNotNull(event); + assertEquals(WorkflowLifecycleEventType.TIMEOUT, event.getEventType()); + assertEquals(workflowExecutionRunnable, event.getWorkflowExecutionRunnable()); + } + + @Test + void testCreateEventWithZeroTimeout() { + IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(0); + workflowInstance.setStartTime(new Date()); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + + WorkflowTimeoutLifecycleEvent event = WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable); + + assertNotNull(event); + } + + @Test + void testCreateEventWithAlreadyTimeout() { + IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(30); + workflowInstance.setStartTime(new Date(System.currentTimeMillis() - 60 * 60 * 1000)); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + + WorkflowTimeoutLifecycleEvent event = WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable); + + assertNotNull(event); + } + + @Test + void testCreateEventWithNullWorkflowInstance() { + IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(null); + + assertThrows(IllegalStateException.class, + () -> WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable)); + } + + @Test + void testCreateEventWithNegativeTimeout() { + IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(-1); + workflowInstance.setStartTime(new Date()); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + + assertThrows(IllegalStateException.class, + () -> WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable)); + } + + @Test + void testToString() { + IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(60); + workflowInstance.setStartTime(new Date()); + + WorkflowExecuteContext context = mock(WorkflowExecuteContext.class); + when(context.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getWorkflowExecuteContext()).thenReturn(context); + + WorkflowTimeoutLifecycleEvent event = WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable); + + String toString = event.toString(); + assertTrue(toString.contains("WorkflowTimeoutLifecycleEvent")); + assertTrue(toString.contains("test-workflow")); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandlerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandlerTest.java new file mode 100644 index 000000000000..2ca529a6b6af --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandlerTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; + +import java.util.Date; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class WorkflowStartLifecycleEventHandlerTest { + + @Mock + private IWorkflowStateAction workflowStateAction; + + @Mock + private IWorkflowExecutionRunnable workflowExecutionRunnable; + + @Mock + private WorkflowStartLifecycleEvent workflowStartEvent; + + @Mock + private WorkflowEventBus workflowEventBus; + + private WorkflowStartLifecycleEventHandler handler; + + @BeforeEach + void setUp() { + handler = new WorkflowStartLifecycleEventHandler(); + } + + @Test + void testMatchEventType() { + assertEquals(WorkflowLifecycleEventType.START, handler.matchEventType()); + } + + @Test + void testHandleWithTimeoutConfigured() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(60); + workflowInstance.setStartTime(new Date()); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getWorkflowEventBus()).thenReturn(workflowEventBus); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowStartEvent); + + verify(workflowEventBus).publish(any(WorkflowTimeoutLifecycleEvent.class)); + verify(workflowStateAction).onStartEvent(workflowExecutionRunnable, workflowStartEvent); + } + + @Test + void testHandleWithNoTimeout() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(0); + workflowInstance.setStartTime(new Date()); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowStartEvent); + + verify(workflowStateAction).onStartEvent(workflowExecutionRunnable, workflowStartEvent); + } + + @Test + void testHandleWithNegativeTimeout() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setTimeout(-1); + workflowInstance.setStartTime(new Date()); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowStartEvent); + + verify(workflowStateAction).onStartEvent(workflowExecutionRunnable, workflowStartEvent); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java new file mode 100644 index 000000000000..ccd8b15d4dfe --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; +import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class WorkflowTimeoutLifecycleEventHandlerTest { + + @Mock + private WorkflowAlertManager workflowAlertManager; + + @Mock + private IWorkflowStateAction workflowStateAction; + + @Mock + private IWorkflowExecutionRunnable workflowExecutionRunnable; + + @Mock + private WorkflowTimeoutLifecycleEvent workflowTimeoutEvent; + + private WorkflowTimeoutLifecycleEventHandler handler; + + @BeforeEach + void setUp() { + handler = new WorkflowTimeoutLifecycleEventHandler(workflowAlertManager); + } + + @Test + void testMatchEventType() { + assertEquals(WorkflowLifecycleEventType.TIMEOUT, handler.matchEventType()); + } + + @Test + void testHandleWorkflowTimeoutWithRunningWorkflow() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); + workflowInstance.setWarningGroupId(1); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); + + verify(workflowAlertManager).sendWorkflowTimeoutAlert(eq(workflowInstance), eq(null)); + } + + @Test + void testHandleWorkflowTimeoutWithFinishedWorkflow() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setState(WorkflowExecutionStatus.SUCCESS); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); + + verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any(), any()); + } + + @Test + void testHandleWorkflowTimeoutWithFailedWorkflow() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setState(WorkflowExecutionStatus.FAILURE); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); + + verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any(), any()); + } + + @Test + void testHandleWorkflowTimeoutWithStoppedWorkflow() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setState(WorkflowExecutionStatus.STOP); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); + + verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any(), any()); + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java index bbe3ec3b07b7..ac0024d1994c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Objects; import lombok.extern.slf4j.Slf4j; @@ -103,15 +104,7 @@ public String getContentWorkflowInstance(WorkflowInstance workflowInstance, Project project) { String res; - WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogDao - .queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(), - workflowInstance.getWorkflowDefinitionVersion()); - - String modifyBy = ""; - if (workflowDefinitionLog != null) { - User operator = userDao.queryById(workflowDefinitionLog.getOperator()); - modifyBy = operator == null ? "" : operator.getUserName(); - } + String modifyBy = queryWorkflowOperator(workflowInstance); List successTaskList = new ArrayList<>(1); WorkflowAlertContent workflowAlertContent = WorkflowAlertContent.builder() @@ -136,6 +129,19 @@ public String getContentWorkflowInstance(WorkflowInstance workflowInstance, return res; } + private String queryWorkflowOperator(WorkflowInstance workflowInstance) { + WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogDao + .queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(), + workflowInstance.getWorkflowDefinitionVersion()); + + String modifyBy = ""; + if (workflowDefinitionLog != null) { + User operator = userDao.queryById(workflowDefinitionLog.getOperator()); + modifyBy = operator == null ? "" : operator.getUserName(); + } + return modifyBy; + } + /** * getting worker fault tolerant content * @@ -147,14 +153,7 @@ private String getWorkerToleranceContent(WorkflowInstance workflowInstance, List List toleranceTaskInstanceList = new ArrayList<>(); - WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogDao - .queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(), - workflowInstance.getWorkflowDefinitionVersion()); - String modifyBy = ""; - if (workflowDefinitionLog != null) { - User operator = userDao.queryById(workflowDefinitionLog.getOperator()); - modifyBy = operator == null ? "" : operator.getUserName(); - } + String modifyBy = queryWorkflowOperator(workflowInstance); for (TaskInstance taskInstance : toleranceTaskList) { WorkflowAlertContent workflowAlertContent = WorkflowAlertContent.builder() @@ -264,4 +263,18 @@ public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser = projectDao.queryProjectWithUserByWorkflowInstanceId(workflowInstance.getId()); alertDao.sendTaskTimeoutAlert(workflowInstance, taskInstance, projectUser); } + + public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser) { + // Get workflow definition log for modifyBy + String modifyBy = queryWorkflowOperator(workflowInstance); + if (Objects.isNull(projectUser)) { + Project project = projectDao.queryByCode(workflowInstance.getProjectCode()); + projectUser = new ProjectUser(); + projectUser.setProjectCode(project.getCode()); + projectUser.setProjectName(project.getName()); + projectUser.setUserName(project.getUserName()); + } + + alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser, modifyBy); + } } From dd87e6567f72acc34de2dfcb6679f38239e11b51 Mon Sep 17 00:00:00 2001 From: Zzih96 Date: Wed, 24 Dec 2025 10:50:03 +0800 Subject: [PATCH 4/9] add getWorkflowExecutionRunnable Override --- .../lifecycle/event/WorkflowTimeoutLifecycleEvent.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java index ad9eb8bc0512..4fa0d0303013 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java @@ -27,9 +27,6 @@ import java.util.concurrent.TimeUnit; -import lombok.Getter; - -@Getter public class WorkflowTimeoutLifecycleEvent extends AbstractWorkflowLifecycleLifecycleEvent { private final IWorkflowExecutionRunnable workflowExecutionRunnable; @@ -40,6 +37,11 @@ protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflo this.workflowExecutionRunnable = workflowExecutionRunnable; } + @Override + public IWorkflowExecutionRunnable getWorkflowExecutionRunnable() { + return workflowExecutionRunnable; + } + public static WorkflowTimeoutLifecycleEvent of(final IWorkflowExecutionRunnable workflowExecutionRunnable) { final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); checkState(workflowInstance != null, "The workflow instance must be initialized before timeout monitoring."); From b123cb9dcb404838adf07e1d9302eab6f883a3d0 Mon Sep 17 00:00:00 2001 From: Zzih96 Date: Fri, 9 Jan 2026 17:51:46 +0800 Subject: [PATCH 5/9] update --- .../apache/dolphinscheduler/dao/AlertDao.java | 7 ++---- .../dao/repository/impl/AlertDaoTest.java | 2 +- .../WorkflowTimeoutLifecycleEventHandler.java | 9 +++++-- ...kflowTimeoutLifecycleEventHandlerTest.java | 24 +++++++++++++++---- .../service/alert/WorkflowAlertManager.java | 16 +++---------- 5 files changed, 33 insertions(+), 25 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java index 9cc959af27f1..708c8ad6b8a0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java @@ -192,10 +192,9 @@ public void sendServerStoppedAlert(String host, String serverType) { * workflow time out alert * * @param workflowInstance workflowInstance - * @param projectUser projectUser - * @param modifyBy modifyBy + * @param projectUser projectUser */ - public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser, String modifyBy) { + public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser) { int alertGroupId = workflowInstance.getWarningGroupId(); Alert alert = new Alert(); List workflowAlertContentList = new ArrayList<>(1); @@ -208,8 +207,6 @@ public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectU .workflowInstanceName(workflowInstance.getName()) .commandType(workflowInstance.getCommandType()) .workflowExecutionStatus(workflowInstance.getState()) - .modifyBy(modifyBy) - .recovery(workflowInstance.getRecovery()) .runTimes(workflowInstance.getRunTimes()) .workflowStartTime(workflowInstance.getStartTime()) .workflowHost(workflowInstance.getHost()) diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java index c8033b5b3fc8..991b3effd76a 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java @@ -90,7 +90,7 @@ void testSendWorkflowTimeoutAlert() { projectUser.setProjectName("test-project"); projectUser.setUserName("admin"); - alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser, "admin"); + alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser); List alerts = alertDao.listPendingAlerts(-1); Assertions.assertNotNull(alerts); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java index 1b6a10600be7..6cf1e2f2f3f6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java @@ -56,13 +56,18 @@ public void handle(final IWorkflowStateAction workflowStateAction, return; } + // Check if warning group is configured + if (workflowInstance.getWarningGroupId() == null) { + log.info("Skipped sending timeout alert for workflow {} because warningGroupId is null.", workflowName); + return; + } + log.info("The workflow {} has timeout, try to send a timeout alert.", workflowName); doWorkflowTimeoutAlert(workflowInstance); } private void doWorkflowTimeoutAlert(final WorkflowInstance workflowInstance) { - // ProjectUser will be built in WorkflowAlertManager - workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance, null); + workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance); } @Override diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java index ccd8b15d4dfe..9724c5ee2a12 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java @@ -78,7 +78,7 @@ void testHandleWorkflowTimeoutWithRunningWorkflow() { handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); - verify(workflowAlertManager).sendWorkflowTimeoutAlert(eq(workflowInstance), eq(null)); + verify(workflowAlertManager).sendWorkflowTimeoutAlert(eq(workflowInstance)); } @Test @@ -93,7 +93,7 @@ void testHandleWorkflowTimeoutWithFinishedWorkflow() { handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); - verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any(), any()); + verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any()); } @Test @@ -108,7 +108,7 @@ void testHandleWorkflowTimeoutWithFailedWorkflow() { handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); - verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any(), any()); + verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any()); } @Test @@ -123,6 +123,22 @@ void testHandleWorkflowTimeoutWithStoppedWorkflow() { handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); - verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any(), any()); + verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any()); + } + + @Test + void testHandleWorkflowTimeoutWithNullWarningGroupId() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setName("test-workflow"); + workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); + workflowInstance.setWarningGroupId(null); + + when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); + when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); + + handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); + + verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any()); } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java index ac0024d1994c..9f1f3a302e1c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java @@ -38,7 +38,6 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.Objects; import lombok.extern.slf4j.Slf4j; @@ -264,17 +263,8 @@ public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance, alertDao.sendTaskTimeoutAlert(workflowInstance, taskInstance, projectUser); } - public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser) { - // Get workflow definition log for modifyBy - String modifyBy = queryWorkflowOperator(workflowInstance); - if (Objects.isNull(projectUser)) { - Project project = projectDao.queryByCode(workflowInstance.getProjectCode()); - projectUser = new ProjectUser(); - projectUser.setProjectCode(project.getCode()); - projectUser.setProjectName(project.getName()); - projectUser.setUserName(project.getUserName()); - } - - alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser, modifyBy); + public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance) { + ProjectUser projectUser = projectDao.queryProjectWithUserByWorkflowInstanceId(workflowInstance.getId()); + alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser); } } From 3057bf1ddecffb0cb42b3412cc2695aa9947e325 Mon Sep 17 00:00:00 2001 From: Zzih96 Date: Fri, 9 Jan 2026 17:59:46 +0800 Subject: [PATCH 6/9] update --- .../workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java index 4fa0d0303013..23322c72b6fa 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java @@ -52,8 +52,6 @@ public static WorkflowTimeoutLifecycleEvent of(final IWorkflowExecutionRunnable // Calculate remaining time until timeout: timeout - elapsed time long delayTime = TimeUnit.MINUTES.toMillis(timeout) - (System.currentTimeMillis() - workflowInstance.getStartTime().getTime()); - // Ensure delayTime is not negative (trigger immediately if already timeout) - delayTime = Math.max(0, delayTime); return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable, delayTime); } From e6668897151cc8415a88c27793b9cceb49222e91 Mon Sep 17 00:00:00 2001 From: Zzih96 <158246610+Zzih96@users.noreply.github.com> Date: Mon, 26 Jan 2026 16:32:53 +0800 Subject: [PATCH 7/9] Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java Co-authored-by: Wenjun Ruan --- .../lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java index 6cf1e2f2f3f6..eb8f5747dbca 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java @@ -48,7 +48,6 @@ public void handle(final IWorkflowStateAction workflowStateAction, final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); final String workflowName = workflowExecutionRunnable.getName(); - // Check if workflow is still active (not finished) if (workflowInstance.getState().isFinalState()) { log.info("The workflow {} has been finished with state: {}, skip timeout alert.", workflowName, From 500c718b928e82e80ce141ef548261a4b99873c5 Mon Sep 17 00:00:00 2001 From: Zzih96 <158246610+Zzih96@users.noreply.github.com> Date: Mon, 26 Jan 2026 16:33:01 +0800 Subject: [PATCH 8/9] Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java Co-authored-by: Wenjun Ruan --- .../handler/WorkflowTimeoutLifecycleEventHandler.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java index eb8f5747dbca..6c5845408fd4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java @@ -55,11 +55,6 @@ public void handle(final IWorkflowStateAction workflowStateAction, return; } - // Check if warning group is configured - if (workflowInstance.getWarningGroupId() == null) { - log.info("Skipped sending timeout alert for workflow {} because warningGroupId is null.", workflowName); - return; - } log.info("The workflow {} has timeout, try to send a timeout alert.", workflowName); doWorkflowTimeoutAlert(workflowInstance); From 03bff5f60380ba7d8738e4bc242714b7a342d655 Mon Sep 17 00:00:00 2001 From: Zzih96 Date: Mon, 26 Jan 2026 16:37:54 +0800 Subject: [PATCH 9/9] remove UT --- .../WorkflowTimeoutLifecycleEventTest.java | 135 ---------------- ...orkflowStartLifecycleEventHandlerTest.java | 114 -------------- ...kflowTimeoutLifecycleEventHandlerTest.java | 144 ------------------ 3 files changed, 393 deletions(-) delete mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEventTest.java delete mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandlerTest.java delete mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEventTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEventTest.java deleted file mode 100644 index 2288f51ca127..000000000000 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEventTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; -import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext; - -import java.util.Date; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class WorkflowTimeoutLifecycleEventTest { - - @Test - void testCreateEventWithValidTimeout() { - IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setTimeout(60); - workflowInstance.setStartTime(new Date(System.currentTimeMillis() - 30 * 60 * 1000)); - - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - - WorkflowTimeoutLifecycleEvent event = WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable); - - assertNotNull(event); - assertEquals(WorkflowLifecycleEventType.TIMEOUT, event.getEventType()); - assertEquals(workflowExecutionRunnable, event.getWorkflowExecutionRunnable()); - } - - @Test - void testCreateEventWithZeroTimeout() { - IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setTimeout(0); - workflowInstance.setStartTime(new Date()); - - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - - WorkflowTimeoutLifecycleEvent event = WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable); - - assertNotNull(event); - } - - @Test - void testCreateEventWithAlreadyTimeout() { - IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setTimeout(30); - workflowInstance.setStartTime(new Date(System.currentTimeMillis() - 60 * 60 * 1000)); - - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - - WorkflowTimeoutLifecycleEvent event = WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable); - - assertNotNull(event); - } - - @Test - void testCreateEventWithNullWorkflowInstance() { - IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(null); - - assertThrows(IllegalStateException.class, - () -> WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable)); - } - - @Test - void testCreateEventWithNegativeTimeout() { - IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setTimeout(-1); - workflowInstance.setStartTime(new Date()); - - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - - assertThrows(IllegalStateException.class, - () -> WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable)); - } - - @Test - void testToString() { - IWorkflowExecutionRunnable workflowExecutionRunnable = mock(IWorkflowExecutionRunnable.class); - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setTimeout(60); - workflowInstance.setStartTime(new Date()); - - WorkflowExecuteContext context = mock(WorkflowExecuteContext.class); - when(context.getWorkflowInstance()).thenReturn(workflowInstance); - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - when(workflowExecutionRunnable.getWorkflowExecuteContext()).thenReturn(context); - - WorkflowTimeoutLifecycleEvent event = WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable); - - String toString = event.toString(); - assertTrue(toString.contains("WorkflowTimeoutLifecycleEvent")); - assertTrue(toString.contains("test-workflow")); - } -} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandlerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandlerTest.java deleted file mode 100644 index 2ca529a6b6af..000000000000 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandlerTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; -import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; -import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent; -import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; -import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; -import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; - -import java.util.Date; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class WorkflowStartLifecycleEventHandlerTest { - - @Mock - private IWorkflowStateAction workflowStateAction; - - @Mock - private IWorkflowExecutionRunnable workflowExecutionRunnable; - - @Mock - private WorkflowStartLifecycleEvent workflowStartEvent; - - @Mock - private WorkflowEventBus workflowEventBus; - - private WorkflowStartLifecycleEventHandler handler; - - @BeforeEach - void setUp() { - handler = new WorkflowStartLifecycleEventHandler(); - } - - @Test - void testMatchEventType() { - assertEquals(WorkflowLifecycleEventType.START, handler.matchEventType()); - } - - @Test - void testHandleWithTimeoutConfigured() { - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setTimeout(60); - workflowInstance.setStartTime(new Date()); - - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - when(workflowExecutionRunnable.getWorkflowEventBus()).thenReturn(workflowEventBus); - - handler.handle(workflowStateAction, workflowExecutionRunnable, workflowStartEvent); - - verify(workflowEventBus).publish(any(WorkflowTimeoutLifecycleEvent.class)); - verify(workflowStateAction).onStartEvent(workflowExecutionRunnable, workflowStartEvent); - } - - @Test - void testHandleWithNoTimeout() { - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setTimeout(0); - workflowInstance.setStartTime(new Date()); - - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - - handler.handle(workflowStateAction, workflowExecutionRunnable, workflowStartEvent); - - verify(workflowStateAction).onStartEvent(workflowExecutionRunnable, workflowStartEvent); - } - - @Test - void testHandleWithNegativeTimeout() { - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setTimeout(-1); - workflowInstance.setStartTime(new Date()); - - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - - handler.handle(workflowStateAction, workflowExecutionRunnable, workflowStartEvent); - - verify(workflowStateAction).onStartEvent(workflowExecutionRunnable, workflowStartEvent); - } -} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java deleted file mode 100644 index 9724c5ee2a12..000000000000 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandlerTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; -import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; -import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; -import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; -import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class WorkflowTimeoutLifecycleEventHandlerTest { - - @Mock - private WorkflowAlertManager workflowAlertManager; - - @Mock - private IWorkflowStateAction workflowStateAction; - - @Mock - private IWorkflowExecutionRunnable workflowExecutionRunnable; - - @Mock - private WorkflowTimeoutLifecycleEvent workflowTimeoutEvent; - - private WorkflowTimeoutLifecycleEventHandler handler; - - @BeforeEach - void setUp() { - handler = new WorkflowTimeoutLifecycleEventHandler(workflowAlertManager); - } - - @Test - void testMatchEventType() { - assertEquals(WorkflowLifecycleEventType.TIMEOUT, handler.matchEventType()); - } - - @Test - void testHandleWorkflowTimeoutWithRunningWorkflow() { - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); - workflowInstance.setWarningGroupId(1); - - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); - - handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); - - verify(workflowAlertManager).sendWorkflowTimeoutAlert(eq(workflowInstance)); - } - - @Test - void testHandleWorkflowTimeoutWithFinishedWorkflow() { - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setState(WorkflowExecutionStatus.SUCCESS); - - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); - - handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); - - verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any()); - } - - @Test - void testHandleWorkflowTimeoutWithFailedWorkflow() { - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setState(WorkflowExecutionStatus.FAILURE); - - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); - - handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); - - verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any()); - } - - @Test - void testHandleWorkflowTimeoutWithStoppedWorkflow() { - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setState(WorkflowExecutionStatus.STOP); - - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); - - handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); - - verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any()); - } - - @Test - void testHandleWorkflowTimeoutWithNullWarningGroupId() { - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - workflowInstance.setName("test-workflow"); - workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); - workflowInstance.setWarningGroupId(null); - - when(workflowExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance); - when(workflowExecutionRunnable.getName()).thenReturn("test-workflow"); - - handler.handle(workflowStateAction, workflowExecutionRunnable, workflowTimeoutEvent); - - verify(workflowAlertManager, never()).sendWorkflowTimeoutAlert(any()); - } -}