Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.WorkflowExecutionRunnableFactory;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;
import org.apache.dolphinscheduler.service.command.CommandService;

import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -177,6 +178,7 @@ private CompletableFuture<Void> bootstrapWorkflowExecutionRunnable(IWorkflowExec
return CompletableFuture.completedFuture(null);
}

WorkflowInstanceMetrics.recordWorkflowInstanceSubmit(workflowInstance.getWorkflowDefinitionCode());
workflowRepository.put(workflowExecutionRunnable);
workflowEventBusCoordinator.registerWorkflowEventBus(workflowExecutionRunnable);
workflowExecutionRunnable.getWorkflowEventBus()
Expand All @@ -203,6 +205,9 @@ private Void bootstrapError(Command command, Throwable throwable) {
final int workflowInstanceId = command.getWorkflowInstanceId();

workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstanceId, WorkflowExecutionStatus.FAILURE);
WorkflowInstanceMetrics.recordWorkflowInstanceFinish(
WorkflowExecutionStatus.FAILURE,
command.getWorkflowDefinitionCode());
log.info("Set workflow instance {} state to FAILURE", workflowInstanceId);

commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -86,6 +87,13 @@ public Void doInTransaction(TransactionStatus status) {
workflowInstance.setEndTime(DateUtils.getCurrentDate());
workflowInstance.setState(WorkflowExecutionStatus.STOP);
workflowInstanceDao.upsertWorkflowInstance(workflowInstance);
final Long workflowDefinitionCode =
serialCommand.getWorkflowDefinitionCode() != null
? serialCommand.getWorkflowDefinitionCode()
: workflowInstance.getWorkflowDefinitionCode();
WorkflowInstanceMetrics.recordWorkflowInstanceFinish(
WorkflowExecutionStatus.STOP,
workflowDefinitionCode);
return null;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.policy.IWorkflowFailureStrategy;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;
import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;

Expand Down Expand Up @@ -146,6 +147,9 @@ protected void workflowFinish(final IWorkflowExecutionRunnable workflowExecution
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
workflowInstance.setEndTime(new Date());
transformWorkflowInstanceState(workflowExecutionRunnable, workflowExecutionStatus);
WorkflowInstanceMetrics.recordWorkflowInstanceFinish(
workflowExecutionStatus,
workflowInstance.getWorkflowDefinitionCode());
if (workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowDefinition().getExecutionType()
.isSerial()) {
if (serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId()) > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dolphinscheduler.dao.repository.CommandDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.extract.master.command.WorkflowFailoverCommandParam;
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -48,6 +49,7 @@ public void failoverWorkflow(final WorkflowInstance workflowInstance) {
workflowInstance.getId(),
workflowInstance.getState(),
WorkflowExecutionStatus.FAILOVER);
WorkflowInstanceMetrics.recordWorkflowInstanceFailover(workflowInstance.getWorkflowDefinitionCode());

final WorkflowFailoverCommandParam failoverWorkflowCommandParam = WorkflowFailoverCommandParam.builder()
.workflowExecutionStatus(workflowInstance.getState())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.server.master.metrics;

import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;

import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand All @@ -36,7 +38,16 @@
public class WorkflowInstanceMetrics {

private final Set<String> workflowInstanceStates = ImmutableSet.of(
"submit", "timeout", "finish", "failover", "success", "fail", "stop");
WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(),
WorkflowExecutionStatus.RUNNING_EXECUTION.name(),
WorkflowExecutionStatus.READY_PAUSE.name(),
WorkflowExecutionStatus.PAUSE.name(),
WorkflowExecutionStatus.READY_STOP.name(),
WorkflowExecutionStatus.FAILOVER.name(),
WorkflowExecutionStatus.SUCCESS.name(),
WorkflowExecutionStatus.FAILURE.name(),
WorkflowExecutionStatus.STOP.name(),
WorkflowExecutionStatus.SERIAL_WAIT.name());

static {
for (final String state : workflowInstanceStates) {
Expand Down Expand Up @@ -78,18 +89,39 @@ public synchronized void registerWorkflowInstanceResubmitGauge(Supplier<Number>
.register(Metrics.globalRegistry);
}

public void incWorkflowInstanceByStateAndWorkflowDefinitionCode(final String state,
public void incWorkflowInstanceByStateAndWorkflowDefinitionCode(final WorkflowExecutionStatus state,
final String workflowDefinitionCode) {
// When tags need to be determined from local context,
// you have no choice but to construct or lookup the Meter inside your method body.
// The lookup cost is just a single hash lookup, so it is acceptable for most use cases.
Metrics.globalRegistry.counter(
"ds.workflow.instance.count",
"state", state,
"state", state.name(),
"workflow.definition.code", workflowDefinitionCode)
.increment();
}

public void recordWorkflowInstanceSubmit(final Long workflowDefinitionCode) {
incWorkflowInstanceByStateAndWorkflowDefinitionCode(
WorkflowExecutionStatus.SUBMITTED_SUCCESS,
String.valueOf(workflowDefinitionCode));
}

public void recordWorkflowInstanceFailover(final Long workflowDefinitionCode) {
incWorkflowInstanceByStateAndWorkflowDefinitionCode(
WorkflowExecutionStatus.FAILOVER,
String.valueOf(workflowDefinitionCode));
}

public void recordWorkflowInstanceFinish(final WorkflowExecutionStatus workflowExecutionStatus,
final Long workflowDefinitionCode) {
if (workflowExecutionStatus == null || !workflowExecutionStatus.isFinalState()) {
return;
}
incWorkflowInstanceByStateAndWorkflowDefinitionCode(workflowExecutionStatus,
String.valueOf(workflowDefinitionCode));
}

public void cleanUpWorkflowInstanceCountMetricsByDefinitionCode(final Long workflowDefinitionCode) {
for (final String state : workflowInstanceStates) {
final Counter counter = Metrics.globalRegistry.counter(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.integration.cases;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
import org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;

import java.time.Duration;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;

public class WorkflowInstanceMetricsTestCase extends AbstractMasterIntegrationTestCase {

@Test
@DisplayName("Test workflow instance metrics for a successful workflow")
public void testWorkflowInstanceMetrics_with_oneSuccessTask() {
final String yaml = "/it/metrics/workflow_with_one_fake_task_success_for_metrics.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getOneWorkflow();
final long workflowDefinitionCode = workflow.getCode();
WorkflowInstanceMetrics.cleanUpWorkflowInstanceCountMetricsByDefinitionCode(workflowDefinitionCode);

final double submitBefore = workflowInstanceCount(WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(),
workflowDefinitionCode);
final double successBefore = workflowInstanceCount(WorkflowExecutionStatus.SUCCESS.name(),
workflowDefinitionCode);

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
assertThat(workflowInstanceCount(WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(),
workflowDefinitionCode)).isEqualTo(submitBefore + 1.0d);
assertThat(workflowInstanceCount(WorkflowExecutionStatus.SUCCESS.name(), workflowDefinitionCode))
.isEqualTo(successBefore + 1.0d);
});

masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test workflow instance metrics for serial discard strategy")
public void testWorkflowInstanceMetrics_with_serialDiscardStrategy() {
final String yaml = "/it/metrics/workflow_with_serial_discard_strategy_for_metrics.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getOneWorkflow();
final long workflowDefinitionCode = workflow.getCode();
WorkflowInstanceMetrics.cleanUpWorkflowInstanceCountMetricsByDefinitionCode(workflowDefinitionCode);

final double submitBefore = workflowInstanceCount(WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(),
workflowDefinitionCode);
final double successBefore = workflowInstanceCount(WorkflowExecutionStatus.SUCCESS.name(),
workflowDefinitionCode);
final double stopBefore = workflowInstanceCount(WorkflowExecutionStatus.STOP.name(),
workflowDefinitionCode);

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId1 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
final Integer workflowInstanceId2 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
final Integer workflowInstanceId3 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
final WorkflowInstance workflowInstance1 = repository.queryWorkflowInstance(workflowInstanceId1);
final WorkflowInstance workflowInstance2 = repository.queryWorkflowInstance(workflowInstanceId2);
final WorkflowInstance workflowInstance3 = repository.queryWorkflowInstance(workflowInstanceId3);
assertThat(workflowInstance1.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
assertThat(workflowInstance2.getState()).isEqualTo(WorkflowExecutionStatus.STOP);
assertThat(workflowInstance3.getState()).isEqualTo(WorkflowExecutionStatus.STOP);

assertThat(workflowInstanceCount(WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(),
workflowDefinitionCode)).isEqualTo(submitBefore + 1.0d);
assertThat(workflowInstanceCount(WorkflowExecutionStatus.SUCCESS.name(), workflowDefinitionCode))
.isEqualTo(successBefore + 1.0d);
assertThat(workflowInstanceCount(WorkflowExecutionStatus.STOP.name(), workflowDefinitionCode))
.isEqualTo(stopBefore + 2.0d);
});

masterContainer.assertAllResourceReleased();
}

private double workflowInstanceCount(final String state, final long workflowDefinitionCode) {
final Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count")
.tags(
"state",
state,
"workflow.definition.code",
String.valueOf(workflowDefinitionCode))
.counter();
return counter == null ? 0.0d : counter.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ master:
group: default
server-load-protection:
# If set true, will open master overload protection
enabled: true
enabled: false
# Master max system cpu usage, when the master's system cpu usage is smaller then this value, master server can execute workflow.
max-system-cpu-usage-percentage-thresholds: 0.9
# Master max jvm cpu usage, when the master's jvm cpu usage is smaller then this value, master server can execute workflow.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

project:
name: MasterIntegrationTest
code: 2101
description: This is a fake project
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00

workflows:
- name: workflow_with_one_fake_task_success_for_metrics
code: 210101
version: 1
projectCode: 2101
description: This is a fake workflow with single task for metrics
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
userId: 1
executionType: PARALLEL

tasks:
- name: A
code: 2101001
version: 1
projectCode: 2101
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH

taskRelations:
- projectCode: 2101
workflowDefinitionCode: 210101
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 2101001
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
Loading
Loading