diff --git a/dolphinscheduler-master/pom.xml b/dolphinscheduler-master/pom.xml index bcc2e7b2b9dc..77602914df6f 100644 --- a/dolphinscheduler-master/pom.xml +++ b/dolphinscheduler-master/pom.xml @@ -307,6 +307,10 @@ org.apache.logging.log4j log4j-to-slf4j + + com.vaadin.external.google + android-json + diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java index 1e2469526413..798e169713ad 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java @@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.WorkflowExecutionRunnableFactory; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; +import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics; import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.commons.collections4.CollectionUtils; @@ -177,6 +178,7 @@ private CompletableFuture bootstrapWorkflowExecutionRunnable(IWorkflowExec return CompletableFuture.completedFuture(null); } + WorkflowInstanceMetrics.recordWorkflowInstanceSubmit(workflowInstance.getWorkflowDefinitionCode()); workflowRepository.put(workflowExecutionRunnable); workflowEventBusCoordinator.registerWorkflowEventBus(workflowExecutionRunnable); workflowExecutionRunnable.getWorkflowEventBus() @@ -203,6 +205,9 @@ private Void bootstrapError(Command command, Throwable throwable) { final int workflowInstanceId = command.getWorkflowInstanceId(); workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstanceId, WorkflowExecutionStatus.FAILURE); + WorkflowInstanceMetrics.recordWorkflowInstanceFinish( + WorkflowExecutionStatus.FAILURE, + command.getWorkflowDefinitionCode()); log.info("Set workflow instance {} state to FAILURE", workflowInstanceId); commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable)); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/AbstractSerialCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/AbstractSerialCommandHandler.java index 8898fe458ee5..7a67f220856f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/AbstractSerialCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/AbstractSerialCommandHandler.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics; import lombok.extern.slf4j.Slf4j; @@ -86,6 +87,9 @@ public Void doInTransaction(TransactionStatus status) { workflowInstance.setEndTime(DateUtils.getCurrentDate()); workflowInstance.setState(WorkflowExecutionStatus.STOP); workflowInstanceDao.upsertWorkflowInstance(workflowInstance); + WorkflowInstanceMetrics.recordWorkflowInstanceFinish( + WorkflowExecutionStatus.STOP, + workflowInstance.getWorkflowDefinitionCode()); return null; } }); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java index 459abc8a64be..59e770c4fb1e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.policy.IWorkflowFailureStrategy; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics; import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils; import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager; @@ -146,6 +147,9 @@ protected void workflowFinish(final IWorkflowExecutionRunnable workflowExecution final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); workflowInstance.setEndTime(new Date()); transformWorkflowInstanceState(workflowExecutionRunnable, workflowExecutionStatus); + WorkflowInstanceMetrics.recordWorkflowInstanceFinish( + workflowExecutionStatus, + workflowInstance.getWorkflowDefinitionCode()); if (workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowDefinition().getExecutionType() .isSerial()) { if (serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId()) > 0) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/WorkflowFailover.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/WorkflowFailover.java index d3d8ec066dad..24c4dde255e7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/WorkflowFailover.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/WorkflowFailover.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.repository.CommandDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.extract.master.command.WorkflowFailoverCommandParam; +import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics; import lombok.extern.slf4j.Slf4j; @@ -48,6 +49,7 @@ public void failoverWorkflow(final WorkflowInstance workflowInstance) { workflowInstance.getId(), workflowInstance.getState(), WorkflowExecutionStatus.FAILOVER); + WorkflowInstanceMetrics.recordWorkflowInstanceFailover(workflowInstance.getWorkflowDefinitionCode()); final WorkflowFailoverCommandParam failoverWorkflowCommandParam = WorkflowFailoverCommandParam.builder() .workflowExecutionStatus(workflowInstance.getState()) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetrics.java index 013c517bde87..a0b0787d7a27 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetrics.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetrics.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.metrics; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; + import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -36,7 +38,16 @@ public class WorkflowInstanceMetrics { private final Set workflowInstanceStates = ImmutableSet.of( - "submit", "timeout", "finish", "failover", "success", "fail", "stop"); + WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(), + WorkflowExecutionStatus.RUNNING_EXECUTION.name(), + WorkflowExecutionStatus.READY_PAUSE.name(), + WorkflowExecutionStatus.PAUSE.name(), + WorkflowExecutionStatus.READY_STOP.name(), + WorkflowExecutionStatus.FAILOVER.name(), + WorkflowExecutionStatus.SUCCESS.name(), + WorkflowExecutionStatus.FAILURE.name(), + WorkflowExecutionStatus.STOP.name(), + WorkflowExecutionStatus.SERIAL_WAIT.name()); static { for (final String state : workflowInstanceStates) { @@ -78,18 +89,39 @@ public synchronized void registerWorkflowInstanceResubmitGauge(Supplier .register(Metrics.globalRegistry); } - public void incWorkflowInstanceByStateAndWorkflowDefinitionCode(final String state, + public void incWorkflowInstanceByStateAndWorkflowDefinitionCode(final WorkflowExecutionStatus state, final String workflowDefinitionCode) { // When tags need to be determined from local context, // you have no choice but to construct or lookup the Meter inside your method body. // The lookup cost is just a single hash lookup, so it is acceptable for most use cases. Metrics.globalRegistry.counter( "ds.workflow.instance.count", - "state", state, + "state", state.name(), "workflow.definition.code", workflowDefinitionCode) .increment(); } + public void recordWorkflowInstanceSubmit(final Long workflowDefinitionCode) { + incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.SUBMITTED_SUCCESS, + String.valueOf(workflowDefinitionCode)); + } + + public void recordWorkflowInstanceFailover(final Long workflowDefinitionCode) { + incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.FAILOVER, + String.valueOf(workflowDefinitionCode)); + } + + public void recordWorkflowInstanceFinish(final WorkflowExecutionStatus workflowExecutionStatus, + final Long workflowDefinitionCode) { + if (workflowExecutionStatus == null || !workflowExecutionStatus.isFinalState()) { + return; + } + incWorkflowInstanceByStateAndWorkflowDefinitionCode(workflowExecutionStatus, + String.valueOf(workflowDefinitionCode)); + } + public void cleanUpWorkflowInstanceCountMetricsByDefinitionCode(final Long workflowDefinitionCode) { for (final String state : workflowInstanceStates) { final Counter counter = Metrics.globalRegistry.counter( diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceMetricsTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceMetricsTestCase.java new file mode 100644 index 000000000000..def08e537db3 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceMetricsTestCase.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.integration.cases; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam; +import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase; +import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator; +import org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext; +import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics; + +import java.time.Duration; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; + +public class WorkflowInstanceMetricsTestCase extends AbstractMasterIntegrationTestCase { + + @Test + @DisplayName("Test workflow instance metrics for a successful workflow") + public void testWorkflowInstanceMetrics_with_oneSuccessTask() { + final String yaml = "/it/metrics/workflow_with_one_fake_task_success_for_metrics.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition workflow = context.getOneWorkflow(); + final long workflowDefinitionCode = workflow.getCode(); + WorkflowInstanceMetrics.cleanUpWorkflowInstanceCountMetricsByDefinitionCode(workflowDefinitionCode); + + final double submitBefore = workflowInstanceCount(WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(), + workflowDefinitionCode); + final double successBefore = workflowInstanceCount(WorkflowExecutionStatus.SUCCESS.name(), + workflowDefinitionCode); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(workflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .build(); + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + await() + .atMost(Duration.ofMinutes(2)) + .untilAsserted(() -> { + assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState()) + .isEqualTo(WorkflowExecutionStatus.SUCCESS); + assertThat(workflowInstanceCount(WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(), + workflowDefinitionCode)).isEqualTo(submitBefore + 1.0d); + assertThat(workflowInstanceCount(WorkflowExecutionStatus.SUCCESS.name(), workflowDefinitionCode)) + .isEqualTo(successBefore + 1.0d); + }); + + masterContainer.assertAllResourceReleased(); + } + + @Test + @DisplayName("Test workflow instance metrics for serial discard strategy") + public void testWorkflowInstanceMetrics_with_serialDiscardStrategy() { + final String yaml = "/it/metrics/workflow_with_serial_discard_strategy_for_metrics.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition workflow = context.getOneWorkflow(); + final long workflowDefinitionCode = workflow.getCode(); + WorkflowInstanceMetrics.cleanUpWorkflowInstanceCountMetricsByDefinitionCode(workflowDefinitionCode); + + final double submitBefore = workflowInstanceCount(WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(), + workflowDefinitionCode); + final double successBefore = workflowInstanceCount(WorkflowExecutionStatus.SUCCESS.name(), + workflowDefinitionCode); + final double stopBefore = workflowInstanceCount(WorkflowExecutionStatus.STOP.name(), + workflowDefinitionCode); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(workflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .build(); + final Integer workflowInstanceId1 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + final Integer workflowInstanceId2 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + final Integer workflowInstanceId3 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + await() + .atMost(Duration.ofMinutes(2)) + .untilAsserted(() -> { + final WorkflowInstance workflowInstance1 = repository.queryWorkflowInstance(workflowInstanceId1); + final WorkflowInstance workflowInstance2 = repository.queryWorkflowInstance(workflowInstanceId2); + final WorkflowInstance workflowInstance3 = repository.queryWorkflowInstance(workflowInstanceId3); + assertThat(workflowInstance1.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS); + assertThat(workflowInstance2.getState()).isEqualTo(WorkflowExecutionStatus.STOP); + assertThat(workflowInstance3.getState()).isEqualTo(WorkflowExecutionStatus.STOP); + + assertThat(workflowInstanceCount(WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(), + workflowDefinitionCode)).isEqualTo(submitBefore + 1.0d); + assertThat(workflowInstanceCount(WorkflowExecutionStatus.SUCCESS.name(), workflowDefinitionCode)) + .isEqualTo(successBefore + 1.0d); + assertThat(workflowInstanceCount(WorkflowExecutionStatus.STOP.name(), workflowDefinitionCode)) + .isEqualTo(stopBefore + 2.0d); + }); + + masterContainer.assertAllResourceReleased(); + } + + private double workflowInstanceCount(final String state, final long workflowDefinitionCode) { + final Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tags( + "state", + state, + "workflow.definition.code", + String.valueOf(workflowDefinitionCode)) + .counter(); + return counter == null ? 0.0d : counter.count(); + } +} diff --git a/dolphinscheduler-master/src/test/resources/it/metrics/workflow_with_one_fake_task_success_for_metrics.yaml b/dolphinscheduler-master/src/test/resources/it/metrics/workflow_with_one_fake_task_success_for_metrics.yaml new file mode 100644 index 000000000000..dbc1ca32f71b --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/metrics/workflow_with_one_fake_task_success_for_metrics.yaml @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 2101 + description: This is a fake project + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + +workflows: + - name: workflow_with_one_fake_task_success_for_metrics + code: 210101 + version: 1 + projectCode: 2101 + description: This is a fake workflow with single task for metrics + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + +tasks: + - name: A + code: 2101001 + version: 1 + projectCode: 2101 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 30"}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 2101 + workflowDefinitionCode: 210101 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 2101001 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 diff --git a/dolphinscheduler-master/src/test/resources/it/metrics/workflow_with_serial_discard_strategy_for_metrics.yaml b/dolphinscheduler-master/src/test/resources/it/metrics/workflow_with_serial_discard_strategy_for_metrics.yaml new file mode 100644 index 000000000000..409fa7e328fe --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/metrics/workflow_with_serial_discard_strategy_for_metrics.yaml @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 2102 + description: This is a fake project + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + +workflows: + - name: workflow_with_serial_discard_strategy_for_metrics + code: 210201 + version: 1 + projectCode: 2102 + description: This is a fake workflow with single task for metrics + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: SERIAL_DISCARD + +tasks: + - name: A + code: 2102001 + version: 1 + projectCode: 2102 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 30"}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 2102 + workflowDefinitionCode: 210201 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 2102001 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00