diff --git a/dinky-admin/src/main/java/org/dinky/controller/MetricController.java b/dinky-admin/src/main/java/org/dinky/controller/MetricController.java new file mode 100644 index 0000000000..ec82fcd3c1 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/controller/MetricController.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.controller; + +import org.dinky.metric.PrometheusService; +import org.dinky.metric.base.MetricType; + +import java.util.List; + +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import cn.dev33.satoken.annotation.SaIgnore; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * MetricController + * + * @since 2025/9/4 11:32 + */ +@Slf4j +@Api(tags = "Metrric Controller") +@RestController +@RequestMapping("/metric") +@RequiredArgsConstructor +@SaIgnore +public class MetricController { + + private final PrometheusService prometheusService; + + @GetMapping(value = "/prometheus", produces = MediaType.TEXT_PLAIN_VALUE) + public String prometheus() { + return prometheusService.retrieveAllFlinkJobMetrics(null); + } + + @GetMapping(value = "/prometheus/{types}", produces = MediaType.TEXT_PLAIN_VALUE) + public String prometheus(@PathVariable List types) { + return prometheusService.retrieveAllFlinkJobMetrics(types); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java b/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java index 55fd072a3e..343156193c 100644 --- a/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java +++ b/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java @@ -49,6 +49,9 @@ public interface JobInstanceMapper extends SuperMapper { @InterceptorIgnore(tenantLine = "true") List listJobInstanceActive(); + @InterceptorIgnore(tenantLine = "true") + List listAllJobInstances(); + JobInstance getJobInstanceByTaskId(Integer id); @InterceptorIgnore(tenantLine = "true") diff --git a/dinky-admin/src/main/java/org/dinky/metric/PrometheusService.java b/dinky-admin/src/main/java/org/dinky/metric/PrometheusService.java new file mode 100644 index 0000000000..431a7c2c60 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/metric/PrometheusService.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metric; + +import org.dinky.metric.base.MetricService; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.stereotype.Service; + +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor +public class PrometheusService extends MetricService { + + @Override + protected String formatFlinkJobMetrics(List> metrics) { + StringBuilder sb = new StringBuilder(); + + for (HashMap metric : metrics) { + if (!metric.containsKey("name") || !metric.containsKey("value")) { + continue; + } + + String name = metric.get("name").toString().toLowerCase(); + Object value = metric.get("value"); + + StringBuilder labels = new StringBuilder(); + for (Map.Entry entry : metric.entrySet()) { + String key = entry.getKey(); + if ("name".equals(key) || "value".equals(key)) { + continue; + } + if (labels.length() > 0) { + labels.append(","); + } + labels.append(key).append("=\"").append(entry.getValue()).append("\""); + } + + sb.append(name); + if (labels.length() > 0) { + sb.append("{").append(labels).append("}"); + } + sb.append(" ").append(value).append("\n"); + } + + return sb.toString(); + } + + @Override + protected String mergeFlinkJobMetrics(List metricGroups) { + return String.join("\n", metricGroups); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/metric/base/MetricKeys.java b/dinky-admin/src/main/java/org/dinky/metric/base/MetricKeys.java new file mode 100644 index 0000000000..3cd050068d --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/metric/base/MetricKeys.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metric.base; + +public class MetricKeys { + + public static final String DINKY_FLINK_TASK_ID = "task_id"; + public static final String DINKY_FLINK_TASK_NAME = "task_name"; + public static final String DINKY_FLINK_TASK_STATUS = "task_status"; + + public static final String DINKY_FLINK_TASK_MANAGER_ID = "task_manager_id"; + + public static final String DINKY_FLINK_CLUSTER_NAME = "task_cluster"; + + public static final String DINKY_FLINK_TASK_VERTICE_ID = "task_vertice_id"; + + public static final String DINKY_FLINK_TASK_DEPLOY_STATUS = "task_deploy_status"; +} diff --git a/dinky-admin/src/main/java/org/dinky/metric/base/MetricNames.java b/dinky-admin/src/main/java/org/dinky/metric/base/MetricNames.java new file mode 100644 index 0000000000..50909a8662 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/metric/base/MetricNames.java @@ -0,0 +1,160 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metric.base; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MetricNames { + + // Task 基础状态 + public static final String DINKY_FLINK_TASK_IS_RUNNING = "DINKY_FLINK_TASK_IS_RUNNING"; + + // 背压 + public static final String DINKY_FLINK_TASK_BACKPRESSURE_LEVEL = "DINKY_FLINK_TASK_BACKPRESSURE_LEVEL"; + public static final String DINKY_FLINK_TASK_BACKPRESSURE_RATE_MIN = "DINKY_FLINK_TASK_BACKPRESSURE_RATE_MIN"; + public static final String DINKY_FLINK_TASK_BACKPRESSURE_RATE_MAX = "DINKY_FLINK_TASK_BACKPRESSURE_RATE_MAX"; + + // ---------------- Vertice 常用监控指标 ---------------- + // 吞吐率 + public static final String DINKY_FLINK_TASK_NUM_RECORDS_IN = "DINKY_FLINK_TASK_NUM_RECORDS_IN"; + public static final String DINKY_FLINK_TASK_NUM_RECORDS_OUT = "DINKY_FLINK_TASK_NUM_RECORDS_OUT"; + public static final String DINKY_FLINK_TASK_NUM_RECORDS_IN_PER_SEC = "DINKY_FLINK_TASK_NUM_RECORDS_IN_PER_SEC"; + public static final String DINKY_FLINK_TASK_NUM_RECORDS_OUT_PER_SEC = "DINKY_FLINK_TASK_NUM_RECORDS_OUT_PER_SEC"; + + // 数据字节量 + public static final String DINKY_FLINK_TASK_NUM_BYTES_IN = "DINKY_FLINK_TASK_NUM_BYTES_IN"; + public static final String DINKY_FLINK_TASK_NUM_BYTES_OUT = "DINKY_FLINK_TASK_NUM_BYTES_OUT"; + public static final String DINKY_FLINK_TASK_NUM_BYTES_IN_PER_SEC = "DINKY_FLINK_TASK_NUM_BYTES_IN_PER_SEC"; + public static final String DINKY_FLINK_TASK_NUM_BYTES_OUT_PER_SEC = "DINKY_FLINK_TASK_NUM_BYTES_OUT_PER_SEC"; + + // 延迟 + public static final String DINKY_FLINK_TASK_LATENCY = "DINKY_FLINK_TASK_LATENCY"; // 通常是 histogram + public static final String DINKY_FLINK_TASK_WATERMARK = "DINKY_FLINK_TASK_WATERMARK"; + + // Checkpoint + public static final String DINKY_FLINK_TASK_CHECKPOINT_ALIGNMENT_TIME = + "DINKY_FLINK_TASK_CHECKPOINT_ALIGNMENT_TIME"; + public static final String DINKY_FLINK_TASK_CHECKPOINT_START_DELAY = "DINKY_FLINK_TASK_CHECKPOINT_START_DELAY"; + + // CPU 时间 + public static final String DINKY_FLINK_TASK_CPU_TIME = "DINKY_FLINK_TASK_CPU_TIME"; + + // Checkpoint + public static final String DINKY_FLINK_TASK_LAST_CHECKPOINT_SIZE = "DINKY_FLINK_TASK_LAST_CHECKPOINT_SIZE"; + public static final String DINKY_FLINK_TASK_LAST_CHECKPOINT_DURATION = "DINKY_FLINK_TASK_LAST_CHECKPOINT_DURATION"; + public static final String DINKY_FLINK_TASK_NUM_BYTES_IN_CHECKPOINTED = + "DINKY_FLINK_TASK_NUM_BYTES_IN_CHECKPOINTED"; + public static final String DINKY_FLINK_TASK_NUM_BYTES_OUT_CHECKPOINTED = + "DINKY_FLINK_TASK_NUM_BYTES_OUT_CHECKPOINTED"; + + // Kafka / Source Offset + public static final String DINKY_FLINK_TASK_CURRENT_OFFSET = "DINKY_FLINK_TASK_CURRENT_OFFSET"; + public static final String DINKY_FLINK_TASK_COMMITTED_OFFSET = "DINKY_FLINK_TASK_COMMITTED_OFFSET"; + public static final String DINKY_FLINK_TASK_RECORDS_LAG_MAX = "DINKY_FLINK_TASK_RECORDS_LAG_MAX"; + public static final String DINKY_FLINK_TASK_BYTES_CONSUMED_RATE = "DINKY_FLINK_TASK_BYTES_CONSUMED_RATE"; + public static final String DINKY_FLINK_TASK_RECORDS_CONSUMED_RATE = "DINKY_FLINK_TASK_RECORDS_CONSUMED_RATE"; + + // Buffer + public static final String DINKY_FLINK_TASK_BUFFERS_IN_POOL_USAGE = "DINKY_FLINK_TASK_BUFFERS_IN_POOL_USAGE"; + public static final String DINKY_FLINK_TASK_BUFFERS_OUT_POOL_USAGE = "DINKY_FLINK_TASK_BUFFERS_OUT_POOL_USAGE"; + public static final String DINKY_FLINK_TASK_BUFFERS_IN_QUEUE_LENGTH = "DINKY_FLINK_TASK_BUFFERS_IN_QUEUE_LENGTH"; + public static final String DINKY_FLINK_TASK_BUFFERS_OUT_QUEUE_LENGTH = "DINKY_FLINK_TASK_BUFFERS_OUT_QUEUE_LENGTH"; + + // 错误计数 + public static final String DINKY_FLINK_TASK_NUM_RECORDS_FAILED = "DINKY_FLINK_TASK_NUM_RECORDS_FAILED"; + public static final String DINKY_FLINK_TASK_NUM_RECORDS_IN_ERRORS = "DINKY_FLINK_TASK_NUM_RECORDS_IN_ERRORS"; + public static final String DINKY_FLINK_TASK_NUM_RECORDS_OUT_ERRORS = "DINKY_FLINK_TASK_NUM_RECORDS_OUT_ERRORS"; + + // CPU & 启动时间 & 重启次数 + public static final String DINKY_FLINK_TASK_CPU_LOAD = "DINKY_FLINK_TASK_CPU_LOAD"; + public static final String DINKY_FLINK_TASK_START_TIME = "DINKY_FLINK_TASK_START_TIME"; + public static final String DINKY_FLINK_TASK_NUM_RESTARTS = "DINKY_FLINK_TASK_NUM_RESTARTS"; + + public static final Map FLINK_VERTICE_MERTICS_MAP = new HashMap<>(); + + public static final List HUDI_METRICS_LIST = Arrays.asList( + DINKY_FLINK_TASK_NUM_RECORDS_IN, + DINKY_FLINK_TASK_NUM_RECORDS_OUT, + DINKY_FLINK_TASK_NUM_RECORDS_IN_PER_SEC, + DINKY_FLINK_TASK_NUM_RECORDS_OUT_PER_SEC); + + static { + // 吞吐率 + FLINK_VERTICE_MERTICS_MAP.put("numRecordsIn", DINKY_FLINK_TASK_NUM_RECORDS_IN); + FLINK_VERTICE_MERTICS_MAP.put("numRecordsOut", DINKY_FLINK_TASK_NUM_RECORDS_OUT); + FLINK_VERTICE_MERTICS_MAP.put("numRecordsInPerSecond", DINKY_FLINK_TASK_NUM_RECORDS_IN_PER_SEC); + FLINK_VERTICE_MERTICS_MAP.put("numRecordsOutPerSecond", DINKY_FLINK_TASK_NUM_RECORDS_OUT_PER_SEC); + + // 数据字节量 + FLINK_VERTICE_MERTICS_MAP.put("numBytesIn", DINKY_FLINK_TASK_NUM_BYTES_IN); + FLINK_VERTICE_MERTICS_MAP.put("numBytesOut", DINKY_FLINK_TASK_NUM_BYTES_OUT); + FLINK_VERTICE_MERTICS_MAP.put("numBytesInPerSecond", DINKY_FLINK_TASK_NUM_BYTES_IN_PER_SEC); + FLINK_VERTICE_MERTICS_MAP.put("numBytesOutPerSecond", DINKY_FLINK_TASK_NUM_BYTES_OUT_PER_SEC); + + // 延迟 & watermark + FLINK_VERTICE_MERTICS_MAP.put("latency", DINKY_FLINK_TASK_LATENCY); + FLINK_VERTICE_MERTICS_MAP.put("currentOutputWatermark", DINKY_FLINK_TASK_WATERMARK); + + // checkpoint + FLINK_VERTICE_MERTICS_MAP.put("checkpointAlignmentTime", DINKY_FLINK_TASK_CHECKPOINT_ALIGNMENT_TIME); + FLINK_VERTICE_MERTICS_MAP.put("checkpointStartDelay", DINKY_FLINK_TASK_CHECKPOINT_START_DELAY); + FLINK_VERTICE_MERTICS_MAP.put("lastCheckpointDuration", DINKY_FLINK_TASK_LAST_CHECKPOINT_DURATION); + + // 内存 + // FLINK_VERTICE_MERTICS_MAP.put("memUsed", DINKY_FLINK_TASK_MEM_USED); + FLINK_VERTICE_MERTICS_MAP.put("buffers.inPoolUsage", DINKY_FLINK_TASK_BUFFERS_IN_POOL_USAGE); + FLINK_VERTICE_MERTICS_MAP.put("buffers.outPoolUsage", DINKY_FLINK_TASK_BUFFERS_OUT_POOL_USAGE); + + // CPU + FLINK_VERTICE_MERTICS_MAP.put("CpuTime", DINKY_FLINK_TASK_CPU_TIME); + + // Checkpoint + FLINK_VERTICE_MERTICS_MAP.put("lastCheckpointSize", DINKY_FLINK_TASK_LAST_CHECKPOINT_SIZE); + FLINK_VERTICE_MERTICS_MAP.put("lastCheckpointDuration", DINKY_FLINK_TASK_LAST_CHECKPOINT_DURATION); + FLINK_VERTICE_MERTICS_MAP.put("numBytesInCheckpointed", DINKY_FLINK_TASK_NUM_BYTES_IN_CHECKPOINTED); + FLINK_VERTICE_MERTICS_MAP.put("numBytesOutCheckpointed", DINKY_FLINK_TASK_NUM_BYTES_OUT_CHECKPOINTED); + + // Kafka / Source Offset + FLINK_VERTICE_MERTICS_MAP.put("currentOffset", DINKY_FLINK_TASK_CURRENT_OFFSET); + FLINK_VERTICE_MERTICS_MAP.put("committedOffset", DINKY_FLINK_TASK_COMMITTED_OFFSET); + FLINK_VERTICE_MERTICS_MAP.put("records-lag-max", DINKY_FLINK_TASK_RECORDS_LAG_MAX); + FLINK_VERTICE_MERTICS_MAP.put("bytes-consumed-rate", DINKY_FLINK_TASK_BYTES_CONSUMED_RATE); + FLINK_VERTICE_MERTICS_MAP.put("records-consumed-rate", DINKY_FLINK_TASK_RECORDS_CONSUMED_RATE); + + // Buffer + FLINK_VERTICE_MERTICS_MAP.put("buffers.inPoolUsage", DINKY_FLINK_TASK_BUFFERS_IN_POOL_USAGE); + FLINK_VERTICE_MERTICS_MAP.put("buffers.outPoolUsage", DINKY_FLINK_TASK_BUFFERS_OUT_POOL_USAGE); + FLINK_VERTICE_MERTICS_MAP.put("buffers.inQueueLength", DINKY_FLINK_TASK_BUFFERS_IN_QUEUE_LENGTH); + FLINK_VERTICE_MERTICS_MAP.put("buffers.outQueueLength", DINKY_FLINK_TASK_BUFFERS_OUT_QUEUE_LENGTH); + + // 错误计数 + FLINK_VERTICE_MERTICS_MAP.put("numRecordsFailed", DINKY_FLINK_TASK_NUM_RECORDS_FAILED); + FLINK_VERTICE_MERTICS_MAP.put("numRecordsInErrors", DINKY_FLINK_TASK_NUM_RECORDS_IN_ERRORS); + FLINK_VERTICE_MERTICS_MAP.put("numRecordsOutErrors", DINKY_FLINK_TASK_NUM_RECORDS_OUT_ERRORS); + + // CPU & 启动时间 & 重启次数 + FLINK_VERTICE_MERTICS_MAP.put("cpuLoad", DINKY_FLINK_TASK_CPU_LOAD); + FLINK_VERTICE_MERTICS_MAP.put("taskStartTime", DINKY_FLINK_TASK_START_TIME); + FLINK_VERTICE_MERTICS_MAP.put("numRestarts", DINKY_FLINK_TASK_NUM_RESTARTS); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java b/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java new file mode 100644 index 0000000000..3d3739c98c --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java @@ -0,0 +1,371 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metric.base; + +import org.dinky.api.FlinkAPI; +import org.dinky.daemon.pool.ScheduleThreadPool; +import org.dinky.data.enums.JobStatus; +import org.dinky.data.model.Task; +import org.dinky.data.model.ext.JobInfoDetail; +import org.dinky.data.model.job.JobInstance; +import org.dinky.service.JobInstanceService; +import org.dinky.service.TaskService; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.support.PeriodicTrigger; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class MetricService { + + @Autowired + private JobInstanceService jobInstanceService; + + @Autowired + private TaskService taskService; + + @Autowired + private ScheduleThreadPool schedule; + + private HashMap metricCaches = new HashMap<>(); + + private boolean isScheduleStart = false; + + protected abstract T formatFlinkJobMetrics(List> metrics); + + protected abstract T mergeFlinkJobMetrics(List metricGroups); + + public T retrieveAllFlinkJobMetrics(List types) { + if (!this.isScheduleStart) { + synchronized (this) { + if (!this.isScheduleStart) { + this.isScheduleStart = true; + schedule.addSchedule( + "retrieve.all.flink.status.metrics", + () -> this.runRetrieveAllFlinkJobMetrics(MetricType.STATUS), + new PeriodicTrigger(1, TimeUnit.MINUTES)); + schedule.addSchedule( + "retrieve.all.flink.job.metrics", + () -> this.runRetrieveAllFlinkJobMetrics(MetricType.JOBMANAGER), + new PeriodicTrigger(1, TimeUnit.MINUTES)); + schedule.addSchedule( + "retrieve.all.flink.task.metrics", + () -> this.runRetrieveAllFlinkJobMetrics(MetricType.TASKMANAGER), + new PeriodicTrigger(1, TimeUnit.MINUTES)); + schedule.addSchedule( + "retrieve.all.flink.vertices.metrics", + () -> this.runRetrieveAllFlinkJobMetrics(MetricType.VERTICES), + new PeriodicTrigger(2, TimeUnit.MINUTES)); + } + } + } + List _types = types == null || types.isEmpty() + ? Arrays.asList(MetricType.STATUS, MetricType.JOBMANAGER, MetricType.TASKMANAGER, MetricType.VERTICES) + : types; + List metricGroups = this.metricCaches.entrySet().stream() + .filter(e -> _types.contains(e.getKey())) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + return mergeFlinkJobMetrics(metricGroups); + } + + private void runRetrieveAllFlinkJobMetrics(MetricType type) { + List jobInstances = jobInstanceService.listAllJobInstances(); + log.info("jobInstance count: {}", jobInstances.size()); + T allMetrics = this.runRetrieveAllFlinkJobMetricsWithJobList(jobInstances, type); + this.metricCaches.put(type, allMetrics); + } + + private T runRetrieveAllFlinkJobMetricsWithJobList(List jobInstances, MetricType type) { + List> metrics = new ArrayList<>(); + for (JobInstance jobInstance : jobInstances) { + try { + JobInfoDetail jobInfoDetail = jobInstanceService.getJobInfoDetail(jobInstance.getId()); + if (type == MetricType.STATUS) { + metrics.add(this.retrieveFlinkJobStatusMetricByJobInfoDetail(jobInfoDetail)); + } else if (jobInstance.getStatus().equals(JobStatus.RUNNING.toString())) { + switch (type) { + case JOBMANAGER: + metrics.addAll(this.retrieveTaskManagerMetricsByJobInfoDetail(jobInfoDetail)); + break; + case TASKMANAGER: + metrics.addAll(this.retrieveJobManagerMetricsByJobInfoDetail(jobInfoDetail)); + break; + case VERTICES: + metrics.addAll(this.retrieveJobVerticesMetricsByJobInfoDetail(jobInfoDetail)); + break; + } + } + } catch (Exception e) { + log.error("JobInstance {} failed: {}", jobInstance.getId(), e.getMessage(), e); + } + } + return this.formatFlinkJobMetrics(metrics); + } + + private HashMap retrieveFlinkJobStatusMetricByJobInfoDetail(JobInfoDetail jobInfoDetail) { + HashMap metric = this.retrieveJobInfoMetrics(jobInfoDetail); + metric.put("name", MetricNames.DINKY_FLINK_TASK_IS_RUNNING); + metric.put("value", jobInfoDetail.getInstance().getStatus().equals(JobStatus.RUNNING.toString()) ? 1 : 0); + return metric; + } + + private List> retrieveTaskManagerMetricsByJobInfoDetail(JobInfoDetail jobInfoDetail) { + + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + List taskManagers = + Lists.newArrayList(flinkAPI.getTaskManagers().get("taskmanagers")); + return taskManagers.parallelStream() + .flatMap(taskManager -> { + List> metricList = new ArrayList(); + String taskManagerId = taskManager.get("id").asText(); + JsonNode taskManagerMetrics = flinkAPI.getTaskManagerMetrics(taskManagerId); + for (JsonNode metric : Lists.newArrayList(taskManagerMetrics)) { + HashMap other = new HashMap<>(); + other.put( + MetricKeys.DINKY_FLINK_TASK_MANAGER_ID, + taskManagerId.substring( + jobInfoDetail.getInstance().getName().length() + 1)); + metricList.add(buildMetric(jobInfoDetail, metric, other)); + } + return metricList.stream(); + }) + .collect(Collectors.toList()); + } + + private List> retrieveJobManagerMetricsByJobInfoDetail(JobInfoDetail jobInfoDetail) { + List> metricList = new ArrayList(); + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + JsonNode jobManagerMetrics = flinkAPI.getJobManagerMetrics(); + for (JsonNode metric : Lists.newArrayList(jobManagerMetrics)) { + metricList.add(buildMetric(jobInfoDetail, metric)); + } + return metricList; + } + + private List> retrieveJobVerticesMetricsByJobInfoDetail(JobInfoDetail jobInfoDetail) { + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + String jid = jobInfoDetail.getInstance().getJid(); + return flinkAPI.getVertices(jid).parallelStream() + .flatMap(vertice -> { + List> metricList = new ArrayList<>(); + HashMap> whitelist = + this.getWhitelistMetricsFromJobVertices(jobInfoDetail, vertice); + if (!whitelist.isEmpty()) { + HashMap other = new HashMap<>(); + other.put(MetricKeys.DINKY_FLINK_TASK_VERTICE_ID, vertice); + for (String name : whitelist.keySet()) { + List verticeMetrics = new ArrayList<>(); + for (List metricGroup : Lists.partition(whitelist.get(name), 15)) { + JsonNode groupMetrics = + flinkAPI.getJobMetricsData(jid, vertice, String.join(",", metricGroup)); + verticeMetrics.addAll(Lists.newArrayList(groupMetrics)); + } + long value = this.getVerticeMetricValueByMetricName(name, verticeMetrics); + metricList.add(buildMetric(jobInfoDetail, name, value, other)); + } + } + metricList.addAll( + this.retrieveJobVerticeBackPressureMetricsByJobInfoDetail(jobInfoDetail, vertice)); + return metricList.stream(); + }) + .collect(Collectors.toList()); + } + + private long getVerticeMetricValueByMetricName(String dinkyMetricName, List verticeMetrics) { + List values = new ArrayList<>(); + for (JsonNode metric : verticeMetrics) { + if (metric.has("value") && !metric.get("value").isNull()) { + values.add(metric.get("value").asLong()); + } + } + if (values.isEmpty()) { + return 0L; + } + switch (dinkyMetricName) { + case MetricNames.DINKY_FLINK_TASK_RECORDS_CONSUMED_RATE: + case MetricNames.DINKY_FLINK_TASK_BYTES_CONSUMED_RATE: + return (long) values.stream().mapToLong(Long::longValue).sum(); + + case MetricNames.DINKY_FLINK_TASK_RECORDS_LAG_MAX: + return values.stream().mapToLong(Long::longValue).max().orElse(0); + + case MetricNames.DINKY_FLINK_TASK_CURRENT_OFFSET: + case MetricNames.DINKY_FLINK_TASK_COMMITTED_OFFSET: + return values.stream().mapToLong(Long::longValue).max().orElse(0); + + case MetricNames.DINKY_FLINK_TASK_LAST_CHECKPOINT_DURATION: + case MetricNames.DINKY_FLINK_TASK_LAST_CHECKPOINT_SIZE: + return values.stream().mapToLong(Long::longValue).max().orElse(0); + + default: + return (long) values.stream().mapToLong(Long::longValue).sum(); + } + } + + private HashMap> getWhitelistMetricsFromJobVertices( + JobInfoDetail jobInfoDetail, String vertice) { + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + JsonNode metrics = + flinkAPI.getJobMetricsItems(jobInfoDetail.getInstance().getJid(), vertice); + + HashMap> whitelist = new HashMap<>(); + for (JsonNode metric : metrics) { + String id = metric.get("id").asText(); + MetricNames.FLINK_VERTICE_MERTICS_MAP.keySet().stream().forEach(m -> { + if (id.contains(m)) { + String name = MetricNames.FLINK_VERTICE_MERTICS_MAP.get(m); + if (MetricNames.HUDI_METRICS_LIST.stream().anyMatch(name::equals) && id.contains("hoodie")) { + name = name + "_HUDI"; + } + whitelist.computeIfAbsent(name, k -> new ArrayList<>()).add(id); + } + }); + } + return whitelist; + } + + private HashMap buildMetric( + JobInfoDetail jobInfoDetail, String name, long value, HashMap other) { + HashMap base = this.retrieveJobInfoMetrics(jobInfoDetail); + HashMap m = new HashMap<>(base); + m.put("name", name); + m.put("value", value); + if (other != null) { + m.putAll(other); + } + return m; + } + + private HashMap buildMetric( + JobInfoDetail jobInfoDetail, JsonNode metric, HashMap other) { + HashMap base = this.retrieveJobInfoMetrics(jobInfoDetail); + HashMap m = new HashMap<>(base); + String name = + "DINKY_FLINK_JOB_" + metric.get("id").asText().replace(".", "_").toUpperCase(); + long value = metric.get("value").asLong(); + m.put("name", name); + m.put("value", value); + if (other != null) { + m.putAll(other); + } + return m; + } + + private HashMap buildMetric(JobInfoDetail jobInfoDetail, JsonNode metric) { + return this.buildMetric(jobInfoDetail, metric, null); + } + + private List> retrieveJobVerticeBackPressureMetricsByJobInfoDetail( + JobInfoDetail jobInfoDetail, String vertice) { + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + JsonNode backPressure = + flinkAPI.getBackPressureJson(jobInfoDetail.getInstance().getJid(), vertice); + + int backpressureLevel = mapJobVerticeBackPressureLevel( + backPressure.get("backpressureLevel").asText()); + + List ratios = StreamSupport.stream(backPressure.get("subtasks").spliterator(), false) + .map(subtask -> subtask.get("ratio").asDouble()) + .collect(Collectors.toList()); + + double backpressureRateMin = + ratios.stream().mapToDouble(Double::doubleValue).min().orElse(0.0); + double backpressureRateMax = + ratios.stream().mapToDouble(Double::doubleValue).max().orElse(1.0); + + HashMap baseMetrics = this.retrieveJobInfoMetrics(jobInfoDetail); + + List> metricList = new ArrayList<>(); + Map metricsMap = new HashMap<>(); + metricsMap.put(MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_LEVEL, backpressureLevel); + metricsMap.put(MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_RATE_MAX, backpressureRateMax); + metricsMap.put(MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_RATE_MIN, backpressureRateMin); + + metricsMap.forEach((name, value) -> { + HashMap metric = new HashMap<>(baseMetrics); + metric.put("name", name); + metric.put("value", value); + metric.put(MetricKeys.DINKY_FLINK_TASK_VERTICE_ID, vertice); + metricList.add(metric); + }); + + return metricList; + } + + private int mapJobVerticeBackPressureLevel(String backpressureLevel) { + switch (backpressureLevel.toLowerCase()) { + case "ok": + return 1; + case "low": + return 2; + case "medium": + return 3; + case "high": + return 4; + default: + return 0; + } + } + + public HashMap retrieveJobInfoMetrics(JobInfoDetail jobInfoDetail) { + HashMap metrics = new HashMap(); + + if (taskService != null) { + Task task = taskService.getById(jobInfoDetail.getInstance().getTaskId()); + + if (task != null) { + metrics.put( + MetricKeys.DINKY_FLINK_TASK_STATUS, + jobInfoDetail.getInstance().getStatus()); + metrics.put(MetricKeys.DINKY_FLINK_TASK_DEPLOY_STATUS, task.getStep()); + metrics.put(MetricKeys.DINKY_FLINK_TASK_NAME, task.getName()); + } + } + + if (!metrics.containsKey(MetricKeys.DINKY_FLINK_TASK_NAME)) { + metrics.put( + MetricKeys.DINKY_FLINK_TASK_NAME, + jobInfoDetail.getInstance().getName()); + } + + metrics.put(MetricKeys.DINKY_FLINK_TASK_ID, jobInfoDetail.getInstance().getTaskId()); + + if (jobInfoDetail.getClusterConfiguration() != null) { + metrics.put( + MetricKeys.DINKY_FLINK_CLUSTER_NAME, + jobInfoDetail.getClusterConfiguration().getName()); + } + return metrics; + } +} diff --git a/dinky-admin/src/main/java/org/dinky/metric/base/MetricType.java b/dinky-admin/src/main/java/org/dinky/metric/base/MetricType.java new file mode 100644 index 0000000000..5774f2b7f8 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/metric/base/MetricType.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metric.base; + +public enum MetricType { + STATUS(0), + JOBMANAGER(1), + TASKMANAGER(2), + VERTICES(3); + + private final int value; + + MetricType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } +} diff --git a/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java b/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java index 4993fbccac..d9b24d0b21 100644 --- a/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java +++ b/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java @@ -61,6 +61,8 @@ public interface JobInstanceService extends ISuperService { */ List listJobInstanceActive(); + List listAllJobInstances(); + /** * Get the job information detail for the given ID. * diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java index 236942209e..6c23a8c843 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java @@ -148,6 +148,11 @@ public List listJobInstanceActive() { return baseMapper.listJobInstanceActive(); } + @Override + public List listAllJobInstances() { + return baseMapper.listAllJobInstances(); + } + @Override public JobInfoDetail getJobInfoDetail(Integer id) { if (Asserts.isNull(TenantContextHolder.get())) { diff --git a/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml b/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml index a5ee9ed2dd..977a0ac776 100644 --- a/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml +++ b/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml @@ -103,4 +103,16 @@ from dinky_job_instance where id = #{id} + + diff --git a/dinky-core/src/main/java/org/dinky/api/FlinkAPI.java b/dinky-core/src/main/java/org/dinky/api/FlinkAPI.java index 4e7fcf0593..16a2095c12 100644 --- a/dinky-core/src/main/java/org/dinky/api/FlinkAPI.java +++ b/dinky-core/src/main/java/org/dinky/api/FlinkAPI.java @@ -353,7 +353,7 @@ public JsonNode getTaskManagerMetrics(String containerId) { + containerId + FlinkRestAPIConstant.METRICS + FlinkRestAPIConstant.GET - + buildMetricsParams(FlinkRestAPIConstant.JOB_MANAGER)); + + buildMetricsParams(FlinkRestAPIConstant.TASK_MANAGER)); } /** @@ -406,6 +406,27 @@ public JsonNode getJobMetricsData(String jobId, String verticeId, String metrics + FlinkRestAPIConstant.METRICS + "?get=" + URLEncodeUtil.encode(metrics)); } + public JsonNode getJobVerticeMetrics(String jobId, String verticeId) { + JsonNode metrics = this.getJobMetricsItems(jobId, verticeId); + StringBuilder sb = new StringBuilder(); + for (JsonNode node : metrics) { + if (Asserts.isNull(node)) { + continue; + } + + final JsonNode id = node.get(ID); + if (Asserts.isNull(id)) { + continue; + } + + if (sb.length() > 0) { + sb.append(","); + } + sb.append(id.asText()); + } + return this.getJobMetricsData(jobId, verticeId, sb.toString()); + } + /** * GET backpressure */ @@ -417,6 +438,14 @@ public String getBackPressure(String jobId, String verticeId) { + FlinkRestAPIConstant.BACKPRESSURE); } + public JsonNode getBackPressureJson(String jobId, String verticeId) { + return get(FlinkRestAPIConstant.JOBS + + jobId + + FlinkRestAPIConstant.VERTICES + + verticeId + + FlinkRestAPIConstant.BACKPRESSURE); + } + /** * GET watermark */