From 29632e6805a9118c2c6511feb36a07d6ab136f5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E8=B4=B5=E5=8F=91?= Date: Thu, 11 Sep 2025 11:27:53 +0800 Subject: [PATCH 1/5] [dinky-admin] Add support for exporting Prometheus monitoring data --- .../dinky/controller/MetricController.java | 53 +++ .../org/dinky/mapper/JobInstanceMapper.java | 3 + .../org/dinky/metric/PrometheusService.java | 69 ++++ .../org/dinky/metric/base/MetricKeys.java | 33 ++ .../org/dinky/metric/base/MetricNames.java | 160 +++++++++ .../org/dinky/metric/base/MetricService.java | 338 ++++++++++++++++++ .../org/dinky/service/JobInstanceService.java | 2 + .../service/impl/JobInstanceServiceImpl.java | 5 + .../resources/mapper/JobInstanceMapper.xml | 12 + .../src/main/java/org/dinky/api/FlinkAPI.java | 31 +- 10 files changed, 705 insertions(+), 1 deletion(-) create mode 100644 dinky-admin/src/main/java/org/dinky/controller/MetricController.java create mode 100644 dinky-admin/src/main/java/org/dinky/metric/PrometheusService.java create mode 100644 dinky-admin/src/main/java/org/dinky/metric/base/MetricKeys.java create mode 100644 dinky-admin/src/main/java/org/dinky/metric/base/MetricNames.java create mode 100644 dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java diff --git a/dinky-admin/src/main/java/org/dinky/controller/MetricController.java b/dinky-admin/src/main/java/org/dinky/controller/MetricController.java new file mode 100644 index 0000000000..bf56394228 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/controller/MetricController.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.controller; + +import org.dinky.metric.PrometheusService; + +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import cn.dev33.satoken.annotation.SaIgnore; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * MetricController + * + * @since 2025/9/4 11:32 + */ +@Slf4j +@Api(tags = "Metrric Controller") +@RestController +@RequestMapping("/metric") +@RequiredArgsConstructor +@SaIgnore +public class MetricController { + + private final PrometheusService prometheusService; + + @GetMapping(value = "/prometheus", produces = MediaType.TEXT_PLAIN_VALUE) + public String prometheus() { + return prometheusService.retrieveAllFlinkJobMetrics(); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java b/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java index 55fd072a3e..343156193c 100644 --- a/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java +++ b/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java @@ -49,6 +49,9 @@ public interface JobInstanceMapper extends SuperMapper { @InterceptorIgnore(tenantLine = "true") List listJobInstanceActive(); + @InterceptorIgnore(tenantLine = "true") + List listAllJobInstances(); + JobInstance getJobInstanceByTaskId(Integer id); @InterceptorIgnore(tenantLine = "true") diff --git a/dinky-admin/src/main/java/org/dinky/metric/PrometheusService.java b/dinky-admin/src/main/java/org/dinky/metric/PrometheusService.java new file mode 100644 index 0000000000..d0c889696d --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/metric/PrometheusService.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metric; + +import org.dinky.metric.base.MetricService; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.stereotype.Service; + +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor +public class PrometheusService extends MetricService { + + @Override + protected String formatFlinkJobMetrics(List> metrics) { + StringBuilder sb = new StringBuilder(); + + for (HashMap metric : metrics) { + if (!metric.containsKey("name") || !metric.containsKey("value")) { + continue; + } + + String name = metric.get("name").toString().toLowerCase(); + Object value = metric.get("value"); + + StringBuilder labels = new StringBuilder(); + for (Map.Entry entry : metric.entrySet()) { + String key = entry.getKey(); + if ("name".equals(key) || "value".equals(key)) { + continue; + } + if (labels.length() > 0) { + labels.append(","); + } + labels.append(key).append("=\"").append(entry.getValue()).append("\""); + } + + sb.append(name); + if (labels.length() > 0) { + sb.append("{").append(labels).append("}"); + } + sb.append(" ").append(value).append("\n"); + } + + return sb.toString(); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/metric/base/MetricKeys.java b/dinky-admin/src/main/java/org/dinky/metric/base/MetricKeys.java new file mode 100644 index 0000000000..45ed39838e --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/metric/base/MetricKeys.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metric.base; + +public class MetricKeys { + + public static final String DINKY_FLINK_TASK_ID = "task_id"; + public static final String DINKY_FLINK_TASK_NAME = "task_name"; + public static final String DINKY_FLINK_TASK_STATUS = "task_status"; + + public static final String DINKY_FLINK_TASK_MANAGER_ID = "task_manager_id"; + + public static final String DINKY_FLINK_CLUSTER_NAME = "task_cluster"; + + public static final String DINKY_FLINK_TASK_VERTICE_ID = "task_vertice_id"; +} diff --git a/dinky-admin/src/main/java/org/dinky/metric/base/MetricNames.java b/dinky-admin/src/main/java/org/dinky/metric/base/MetricNames.java new file mode 100644 index 0000000000..50909a8662 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/metric/base/MetricNames.java @@ -0,0 +1,160 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metric.base; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MetricNames { + + // Task 基础状态 + public static final String DINKY_FLINK_TASK_IS_RUNNING = "DINKY_FLINK_TASK_IS_RUNNING"; + + // 背压 + public static final String DINKY_FLINK_TASK_BACKPRESSURE_LEVEL = "DINKY_FLINK_TASK_BACKPRESSURE_LEVEL"; + public static final String DINKY_FLINK_TASK_BACKPRESSURE_RATE_MIN = "DINKY_FLINK_TASK_BACKPRESSURE_RATE_MIN"; + public static final String DINKY_FLINK_TASK_BACKPRESSURE_RATE_MAX = "DINKY_FLINK_TASK_BACKPRESSURE_RATE_MAX"; + + // ---------------- Vertice 常用监控指标 ---------------- + // 吞吐率 + public static final String DINKY_FLINK_TASK_NUM_RECORDS_IN = "DINKY_FLINK_TASK_NUM_RECORDS_IN"; + public static final String DINKY_FLINK_TASK_NUM_RECORDS_OUT = "DINKY_FLINK_TASK_NUM_RECORDS_OUT"; + public static final String DINKY_FLINK_TASK_NUM_RECORDS_IN_PER_SEC = "DINKY_FLINK_TASK_NUM_RECORDS_IN_PER_SEC"; + public static final String DINKY_FLINK_TASK_NUM_RECORDS_OUT_PER_SEC = "DINKY_FLINK_TASK_NUM_RECORDS_OUT_PER_SEC"; + + // 数据字节量 + public static final String DINKY_FLINK_TASK_NUM_BYTES_IN = "DINKY_FLINK_TASK_NUM_BYTES_IN"; + public static final String DINKY_FLINK_TASK_NUM_BYTES_OUT = "DINKY_FLINK_TASK_NUM_BYTES_OUT"; + public static final String DINKY_FLINK_TASK_NUM_BYTES_IN_PER_SEC = "DINKY_FLINK_TASK_NUM_BYTES_IN_PER_SEC"; + public static final String DINKY_FLINK_TASK_NUM_BYTES_OUT_PER_SEC = "DINKY_FLINK_TASK_NUM_BYTES_OUT_PER_SEC"; + + // 延迟 + public static final String DINKY_FLINK_TASK_LATENCY = "DINKY_FLINK_TASK_LATENCY"; // 通常是 histogram + public static final String DINKY_FLINK_TASK_WATERMARK = "DINKY_FLINK_TASK_WATERMARK"; + + // Checkpoint + public static final String DINKY_FLINK_TASK_CHECKPOINT_ALIGNMENT_TIME = + "DINKY_FLINK_TASK_CHECKPOINT_ALIGNMENT_TIME"; + public static final String DINKY_FLINK_TASK_CHECKPOINT_START_DELAY = "DINKY_FLINK_TASK_CHECKPOINT_START_DELAY"; + + // CPU 时间 + public static final String DINKY_FLINK_TASK_CPU_TIME = "DINKY_FLINK_TASK_CPU_TIME"; + + // Checkpoint + public static final String DINKY_FLINK_TASK_LAST_CHECKPOINT_SIZE = "DINKY_FLINK_TASK_LAST_CHECKPOINT_SIZE"; + public static final String DINKY_FLINK_TASK_LAST_CHECKPOINT_DURATION = "DINKY_FLINK_TASK_LAST_CHECKPOINT_DURATION"; + public static final String DINKY_FLINK_TASK_NUM_BYTES_IN_CHECKPOINTED = + "DINKY_FLINK_TASK_NUM_BYTES_IN_CHECKPOINTED"; + public static final String DINKY_FLINK_TASK_NUM_BYTES_OUT_CHECKPOINTED = + "DINKY_FLINK_TASK_NUM_BYTES_OUT_CHECKPOINTED"; + + // Kafka / Source Offset + public static final String DINKY_FLINK_TASK_CURRENT_OFFSET = "DINKY_FLINK_TASK_CURRENT_OFFSET"; + public static final String DINKY_FLINK_TASK_COMMITTED_OFFSET = "DINKY_FLINK_TASK_COMMITTED_OFFSET"; + public static final String DINKY_FLINK_TASK_RECORDS_LAG_MAX = "DINKY_FLINK_TASK_RECORDS_LAG_MAX"; + public static final String DINKY_FLINK_TASK_BYTES_CONSUMED_RATE = "DINKY_FLINK_TASK_BYTES_CONSUMED_RATE"; + public static final String DINKY_FLINK_TASK_RECORDS_CONSUMED_RATE = "DINKY_FLINK_TASK_RECORDS_CONSUMED_RATE"; + + // Buffer + public static final String DINKY_FLINK_TASK_BUFFERS_IN_POOL_USAGE = "DINKY_FLINK_TASK_BUFFERS_IN_POOL_USAGE"; + public static final String DINKY_FLINK_TASK_BUFFERS_OUT_POOL_USAGE = "DINKY_FLINK_TASK_BUFFERS_OUT_POOL_USAGE"; + public static final String DINKY_FLINK_TASK_BUFFERS_IN_QUEUE_LENGTH = "DINKY_FLINK_TASK_BUFFERS_IN_QUEUE_LENGTH"; + public static final String DINKY_FLINK_TASK_BUFFERS_OUT_QUEUE_LENGTH = "DINKY_FLINK_TASK_BUFFERS_OUT_QUEUE_LENGTH"; + + // 错误计数 + public static final String DINKY_FLINK_TASK_NUM_RECORDS_FAILED = "DINKY_FLINK_TASK_NUM_RECORDS_FAILED"; + public static final String DINKY_FLINK_TASK_NUM_RECORDS_IN_ERRORS = "DINKY_FLINK_TASK_NUM_RECORDS_IN_ERRORS"; + public static final String DINKY_FLINK_TASK_NUM_RECORDS_OUT_ERRORS = "DINKY_FLINK_TASK_NUM_RECORDS_OUT_ERRORS"; + + // CPU & 启动时间 & 重启次数 + public static final String DINKY_FLINK_TASK_CPU_LOAD = "DINKY_FLINK_TASK_CPU_LOAD"; + public static final String DINKY_FLINK_TASK_START_TIME = "DINKY_FLINK_TASK_START_TIME"; + public static final String DINKY_FLINK_TASK_NUM_RESTARTS = "DINKY_FLINK_TASK_NUM_RESTARTS"; + + public static final Map FLINK_VERTICE_MERTICS_MAP = new HashMap<>(); + + public static final List HUDI_METRICS_LIST = Arrays.asList( + DINKY_FLINK_TASK_NUM_RECORDS_IN, + DINKY_FLINK_TASK_NUM_RECORDS_OUT, + DINKY_FLINK_TASK_NUM_RECORDS_IN_PER_SEC, + DINKY_FLINK_TASK_NUM_RECORDS_OUT_PER_SEC); + + static { + // 吞吐率 + FLINK_VERTICE_MERTICS_MAP.put("numRecordsIn", DINKY_FLINK_TASK_NUM_RECORDS_IN); + FLINK_VERTICE_MERTICS_MAP.put("numRecordsOut", DINKY_FLINK_TASK_NUM_RECORDS_OUT); + FLINK_VERTICE_MERTICS_MAP.put("numRecordsInPerSecond", DINKY_FLINK_TASK_NUM_RECORDS_IN_PER_SEC); + FLINK_VERTICE_MERTICS_MAP.put("numRecordsOutPerSecond", DINKY_FLINK_TASK_NUM_RECORDS_OUT_PER_SEC); + + // 数据字节量 + FLINK_VERTICE_MERTICS_MAP.put("numBytesIn", DINKY_FLINK_TASK_NUM_BYTES_IN); + FLINK_VERTICE_MERTICS_MAP.put("numBytesOut", DINKY_FLINK_TASK_NUM_BYTES_OUT); + FLINK_VERTICE_MERTICS_MAP.put("numBytesInPerSecond", DINKY_FLINK_TASK_NUM_BYTES_IN_PER_SEC); + FLINK_VERTICE_MERTICS_MAP.put("numBytesOutPerSecond", DINKY_FLINK_TASK_NUM_BYTES_OUT_PER_SEC); + + // 延迟 & watermark + FLINK_VERTICE_MERTICS_MAP.put("latency", DINKY_FLINK_TASK_LATENCY); + FLINK_VERTICE_MERTICS_MAP.put("currentOutputWatermark", DINKY_FLINK_TASK_WATERMARK); + + // checkpoint + FLINK_VERTICE_MERTICS_MAP.put("checkpointAlignmentTime", DINKY_FLINK_TASK_CHECKPOINT_ALIGNMENT_TIME); + FLINK_VERTICE_MERTICS_MAP.put("checkpointStartDelay", DINKY_FLINK_TASK_CHECKPOINT_START_DELAY); + FLINK_VERTICE_MERTICS_MAP.put("lastCheckpointDuration", DINKY_FLINK_TASK_LAST_CHECKPOINT_DURATION); + + // 内存 + // FLINK_VERTICE_MERTICS_MAP.put("memUsed", DINKY_FLINK_TASK_MEM_USED); + FLINK_VERTICE_MERTICS_MAP.put("buffers.inPoolUsage", DINKY_FLINK_TASK_BUFFERS_IN_POOL_USAGE); + FLINK_VERTICE_MERTICS_MAP.put("buffers.outPoolUsage", DINKY_FLINK_TASK_BUFFERS_OUT_POOL_USAGE); + + // CPU + FLINK_VERTICE_MERTICS_MAP.put("CpuTime", DINKY_FLINK_TASK_CPU_TIME); + + // Checkpoint + FLINK_VERTICE_MERTICS_MAP.put("lastCheckpointSize", DINKY_FLINK_TASK_LAST_CHECKPOINT_SIZE); + FLINK_VERTICE_MERTICS_MAP.put("lastCheckpointDuration", DINKY_FLINK_TASK_LAST_CHECKPOINT_DURATION); + FLINK_VERTICE_MERTICS_MAP.put("numBytesInCheckpointed", DINKY_FLINK_TASK_NUM_BYTES_IN_CHECKPOINTED); + FLINK_VERTICE_MERTICS_MAP.put("numBytesOutCheckpointed", DINKY_FLINK_TASK_NUM_BYTES_OUT_CHECKPOINTED); + + // Kafka / Source Offset + FLINK_VERTICE_MERTICS_MAP.put("currentOffset", DINKY_FLINK_TASK_CURRENT_OFFSET); + FLINK_VERTICE_MERTICS_MAP.put("committedOffset", DINKY_FLINK_TASK_COMMITTED_OFFSET); + FLINK_VERTICE_MERTICS_MAP.put("records-lag-max", DINKY_FLINK_TASK_RECORDS_LAG_MAX); + FLINK_VERTICE_MERTICS_MAP.put("bytes-consumed-rate", DINKY_FLINK_TASK_BYTES_CONSUMED_RATE); + FLINK_VERTICE_MERTICS_MAP.put("records-consumed-rate", DINKY_FLINK_TASK_RECORDS_CONSUMED_RATE); + + // Buffer + FLINK_VERTICE_MERTICS_MAP.put("buffers.inPoolUsage", DINKY_FLINK_TASK_BUFFERS_IN_POOL_USAGE); + FLINK_VERTICE_MERTICS_MAP.put("buffers.outPoolUsage", DINKY_FLINK_TASK_BUFFERS_OUT_POOL_USAGE); + FLINK_VERTICE_MERTICS_MAP.put("buffers.inQueueLength", DINKY_FLINK_TASK_BUFFERS_IN_QUEUE_LENGTH); + FLINK_VERTICE_MERTICS_MAP.put("buffers.outQueueLength", DINKY_FLINK_TASK_BUFFERS_OUT_QUEUE_LENGTH); + + // 错误计数 + FLINK_VERTICE_MERTICS_MAP.put("numRecordsFailed", DINKY_FLINK_TASK_NUM_RECORDS_FAILED); + FLINK_VERTICE_MERTICS_MAP.put("numRecordsInErrors", DINKY_FLINK_TASK_NUM_RECORDS_IN_ERRORS); + FLINK_VERTICE_MERTICS_MAP.put("numRecordsOutErrors", DINKY_FLINK_TASK_NUM_RECORDS_OUT_ERRORS); + + // CPU & 启动时间 & 重启次数 + FLINK_VERTICE_MERTICS_MAP.put("cpuLoad", DINKY_FLINK_TASK_CPU_LOAD); + FLINK_VERTICE_MERTICS_MAP.put("taskStartTime", DINKY_FLINK_TASK_START_TIME); + FLINK_VERTICE_MERTICS_MAP.put("numRestarts", DINKY_FLINK_TASK_NUM_RESTARTS); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java b/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java new file mode 100644 index 0000000000..c711e4cf85 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java @@ -0,0 +1,338 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metric.base; + +import org.dinky.api.FlinkAPI; +import org.dinky.daemon.pool.ScheduleThreadPool; +import org.dinky.data.enums.JobStatus; +import org.dinky.data.model.ext.JobInfoDetail; +import org.dinky.data.model.job.JobInstance; +import org.dinky.service.JobInstanceService; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.support.PeriodicTrigger; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class MetricService { + + @Autowired + private JobInstanceService jobInstanceService; + + @Autowired + private ScheduleThreadPool schedule; + + private List metricCaches; + + private int currentPos = -1; + + private boolean isScheduleStart = false; + + private final int JOB_GROUP_COUNT = 4; + + protected abstract T formatFlinkJobMetrics(List> metrics); + + public T retrieveAllFlinkJobMetrics() { + if (this.metricCaches == null) { + synchronized (this) { + if (!this.isScheduleStart) { + this.isScheduleStart = true; + schedule.addSchedule( + "retrieve.all.flink.job.metrics", + () -> this.runRetrieveAllFlinkJobMetrics(false), + new PeriodicTrigger(1, TimeUnit.MINUTES)); + } + } + this.runRetrieveAllFlinkJobMetrics(true); + } + currentPos = (currentPos + 1) % metricCaches.size(); + return this.metricCaches.get(currentPos); + } + + private void runRetrieveAllFlinkJobMetrics(boolean isOnlyTaskStatus) { + List currentMetrics = new ArrayList<>(); + List jobInstances = jobInstanceService.listAllJobInstances(); + if (isOnlyTaskStatus) { + T allMetrics = this.runRetrieveAllFlinkJobMetricsWithJobList(jobInstances, true); + for (int i = 0; i < JOB_GROUP_COUNT; i++) { + currentMetrics.add(allMetrics); + } + } else { + jobInstances.sort(Comparator.comparing(JobInstance::getId)); + List> groups = new ArrayList<>(); + for (int i = 0; i < JOB_GROUP_COUNT; i++) { + groups.add(new ArrayList<>()); + } + for (int i = 0; i < jobInstances.size(); i++) { + groups.get(i % JOB_GROUP_COUNT).add(jobInstances.get(i)); + } + for (List group : groups) { + currentMetrics.add(this.runRetrieveAllFlinkJobMetricsWithJobList(group, false)); + } + } + this.metricCaches = currentMetrics; + } + + private T runRetrieveAllFlinkJobMetricsWithJobList(List jobInstances, boolean isOnlyTaskStatus) { + List> metrics = new ArrayList<>(); + for (JobInstance jobInstance : jobInstances) { + try { + JobInfoDetail jobInfoDetail = jobInstanceService.getJobInfoDetail(jobInstance.getId()); + metrics.add(this.retrieveFlinkJobStatusMetricByJobInfoDetail(jobInfoDetail)); + if ((!isOnlyTaskStatus) && jobInstance.getStatus().equals(JobStatus.RUNNING.toString())) { + metrics.addAll(this.retrieveTaskManagerMetricsByJobInfoDetail(jobInfoDetail)); + metrics.addAll(this.retrieveJobManagerMetricsByJobInfoDetail(jobInfoDetail)); + try { + metrics.addAll(this.retrieveJobVerticesMetricsByJobInfoDetail(jobInfoDetail)); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + return this.formatFlinkJobMetrics(metrics); + } + + private HashMap retrieveFlinkJobStatusMetricByJobInfoDetail(JobInfoDetail jobInfoDetail) { + HashMap metric = this.retrieveJobInfoMetrics(jobInfoDetail); + metric.put("name", MetricNames.DINKY_FLINK_TASK_IS_RUNNING); + metric.put("value", jobInfoDetail.getInstance().getStatus().equals(JobStatus.RUNNING.toString()) ? 1 : 0); + return metric; + } + + private List> retrieveTaskManagerMetricsByJobInfoDetail(JobInfoDetail jobInfoDetail) { + List> metricList = new ArrayList(); + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + List taskManagers = + Lists.newArrayList(flinkAPI.getTaskManagers().get("taskmanagers")); + for (JsonNode taskManager : taskManagers) { + String taskManagerId = taskManager.get("id").asText(); + JsonNode taskManagerMetrics = flinkAPI.getTaskManagerMetrics(taskManagerId); + for (JsonNode metric : Lists.newArrayList(taskManagerMetrics)) { + HashMap other = new HashMap<>(Map.of( + MetricKeys.DINKY_FLINK_TASK_MANAGER_ID, + taskManagerId.substring( + jobInfoDetail.getInstance().getName().length() + 1))); + metricList.add(buildMetric(jobInfoDetail, metric, other)); + } + } + return metricList; + } + + private List> retrieveJobManagerMetricsByJobInfoDetail(JobInfoDetail jobInfoDetail) { + List> metricList = new ArrayList(); + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + JsonNode jobManagerMetrics = flinkAPI.getJobManagerMetrics(); + for (JsonNode metric : Lists.newArrayList(jobManagerMetrics)) { + metricList.add(buildMetric(jobInfoDetail, metric)); + } + return metricList; + } + + private List> retrieveJobVerticesMetricsByJobInfoDetail(JobInfoDetail jobInfoDetail) { + List> metricList = new ArrayList<>(); + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + String jid = jobInfoDetail.getInstance().getJid(); + + for (String vertice : flinkAPI.getVertices(jid)) { + HashMap> whitelist = this.getWhitelistMetricsFromJobVertices(jobInfoDetail, vertice); + if (!whitelist.isEmpty()) { + HashMap other = new HashMap<>(Map.of(MetricKeys.DINKY_FLINK_TASK_VERTICE_ID, vertice)); + for (String name : whitelist.keySet()) { + List verticeMetrics = new ArrayList<>(); + for (List metricGroup : Lists.partition(whitelist.get(name), 5)) { + JsonNode groupMetrics = flinkAPI.getJobMetricsData(jid, vertice, String.join(",", metricGroup)); + verticeMetrics.addAll(Lists.newArrayList(groupMetrics)); + } + long value = this.getVerticeMetricValueByMetricName(name, verticeMetrics); + metricList.add(buildMetric(jobInfoDetail, name, value, other)); + } + } + metricList.addAll(this.retrieveJobVerticeBackPressureMetricsByJobInfoDetail(jobInfoDetail, vertice)); + } + + return metricList; + } + + private long getVerticeMetricValueByMetricName(String dinkyMetricName, List verticeMetrics) { + List values = new ArrayList<>(); + for (JsonNode metric : verticeMetrics) { + if (metric.has("value") && !metric.get("value").isNull()) { + values.add(metric.get("value").asLong()); + } + } + if (values.isEmpty()) { + return 0L; + } + switch (dinkyMetricName) { + case MetricNames.DINKY_FLINK_TASK_RECORDS_CONSUMED_RATE: + case MetricNames.DINKY_FLINK_TASK_BYTES_CONSUMED_RATE: + return (long) values.stream().mapToLong(Long::longValue).sum(); + + case MetricNames.DINKY_FLINK_TASK_RECORDS_LAG_MAX: + return values.stream().mapToLong(Long::longValue).max().orElse(0); + + case MetricNames.DINKY_FLINK_TASK_CURRENT_OFFSET: + case MetricNames.DINKY_FLINK_TASK_COMMITTED_OFFSET: + return values.stream().mapToLong(Long::longValue).max().orElse(0); + + case MetricNames.DINKY_FLINK_TASK_LAST_CHECKPOINT_DURATION: + case MetricNames.DINKY_FLINK_TASK_LAST_CHECKPOINT_SIZE: + return values.stream().mapToLong(Long::longValue).max().orElse(0); + + default: + return (long) values.stream().mapToLong(Long::longValue).sum(); + } + } + + private HashMap> getWhitelistMetricsFromJobVertices( + JobInfoDetail jobInfoDetail, String vertice) { + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + JsonNode metrics = + flinkAPI.getJobMetricsItems(jobInfoDetail.getInstance().getJid(), vertice); + + HashMap> whitelist = new HashMap<>(); + for (JsonNode metric : metrics) { + String id = metric.get("id").asText(); + MetricNames.FLINK_VERTICE_MERTICS_MAP.keySet().stream().forEach(m -> { + if (id.contains(m)) { + String name = MetricNames.FLINK_VERTICE_MERTICS_MAP.get(m); + if (MetricNames.HUDI_METRICS_LIST.stream().anyMatch(name::equals) && id.contains("hoodie")) { + name = name + "_HUDI"; + } + whitelist.computeIfAbsent(name, k -> new ArrayList<>()).add(id); + } + }); + } + return whitelist; + } + + private HashMap buildMetric( + JobInfoDetail jobInfoDetail, String name, long value, HashMap other) { + HashMap base = this.retrieveJobInfoMetrics(jobInfoDetail); + HashMap m = new HashMap<>(base); + m.put("name", name); + m.put("value", value); + if (other != null) { + m.putAll(other); + } + return m; + } + + private HashMap buildMetric( + JobInfoDetail jobInfoDetail, JsonNode metric, HashMap other) { + HashMap base = this.retrieveJobInfoMetrics(jobInfoDetail); + HashMap m = new HashMap<>(base); + String name = + "DINKY_FLINK_JOB_" + metric.get("id").asText().replace(".", "_").toUpperCase(); + long value = metric.get("value").asLong(); + m.put("name", name); + m.put("value", value); + if (other != null) { + m.putAll(other); + } + return m; + } + + private HashMap buildMetric(JobInfoDetail jobInfoDetail, JsonNode metric) { + return this.buildMetric(jobInfoDetail, metric, null); + } + + private List> retrieveJobVerticeBackPressureMetricsByJobInfoDetail( + JobInfoDetail jobInfoDetail, String vertice) { + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + JsonNode backPressure = + flinkAPI.getBackPressureJson(jobInfoDetail.getInstance().getJid(), vertice); + + int backpressureLevel = mapJobVerticeBackPressureLevel( + backPressure.get("backpressureLevel").asText()); + + List ratios = StreamSupport.stream(backPressure.get("subtasks").spliterator(), false) + .map(subtask -> subtask.get("ratio").asDouble()) + .collect(Collectors.toList()); + + double backpressureRateMin = + ratios.stream().mapToDouble(Double::doubleValue).min().orElse(0.0); + double backpressureRateMax = + ratios.stream().mapToDouble(Double::doubleValue).max().orElse(1.0); + + HashMap baseMetrics = this.retrieveJobInfoMetrics(jobInfoDetail); + + List> metricList = new ArrayList<>(); + Map metricsMap = Map.of( + MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_LEVEL, backpressureLevel, + MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_RATE_MAX, backpressureRateMax, + MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_RATE_MIN, backpressureRateMin); + + metricsMap.forEach((name, value) -> { + HashMap metric = new HashMap<>(baseMetrics); + metric.put("name", name); + metric.put("value", value); + metric.put(MetricKeys.DINKY_FLINK_TASK_VERTICE_ID, vertice); + metricList.add(metric); + }); + + return metricList; + } + + private int mapJobVerticeBackPressureLevel(String backpressureLevel) { + switch (backpressureLevel.toLowerCase()) { + case "ok": + return 1; + case "low": + return 2; + case "medium": + return 3; + case "high": + return 4; + default: + return 0; + } + } + + public HashMap retrieveJobInfoMetrics(JobInfoDetail jobInfoDetail) { + HashMap metrics = new HashMap(); + + metrics.put(MetricKeys.DINKY_FLINK_TASK_ID, jobInfoDetail.getInstance().getTaskId()); + metrics.put( + MetricKeys.DINKY_FLINK_TASK_NAME, jobInfoDetail.getInstance().getName()); + metrics.put( + MetricKeys.DINKY_FLINK_TASK_STATUS, jobInfoDetail.getInstance().getStatus()); + metrics.put( + MetricKeys.DINKY_FLINK_CLUSTER_NAME, + jobInfoDetail.getClusterConfiguration().getName()); + return metrics; + } +} diff --git a/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java b/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java index 4993fbccac..d9b24d0b21 100644 --- a/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java +++ b/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java @@ -61,6 +61,8 @@ public interface JobInstanceService extends ISuperService { */ List listJobInstanceActive(); + List listAllJobInstances(); + /** * Get the job information detail for the given ID. * diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java index 236942209e..6c23a8c843 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java @@ -148,6 +148,11 @@ public List listJobInstanceActive() { return baseMapper.listJobInstanceActive(); } + @Override + public List listAllJobInstances() { + return baseMapper.listAllJobInstances(); + } + @Override public JobInfoDetail getJobInfoDetail(Integer id) { if (Asserts.isNull(TenantContextHolder.get())) { diff --git a/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml b/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml index a5ee9ed2dd..29f756d1e7 100644 --- a/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml +++ b/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml @@ -103,4 +103,16 @@ from dinky_job_instance where id = #{id} + + diff --git a/dinky-core/src/main/java/org/dinky/api/FlinkAPI.java b/dinky-core/src/main/java/org/dinky/api/FlinkAPI.java index 4e7fcf0593..16a2095c12 100644 --- a/dinky-core/src/main/java/org/dinky/api/FlinkAPI.java +++ b/dinky-core/src/main/java/org/dinky/api/FlinkAPI.java @@ -353,7 +353,7 @@ public JsonNode getTaskManagerMetrics(String containerId) { + containerId + FlinkRestAPIConstant.METRICS + FlinkRestAPIConstant.GET - + buildMetricsParams(FlinkRestAPIConstant.JOB_MANAGER)); + + buildMetricsParams(FlinkRestAPIConstant.TASK_MANAGER)); } /** @@ -406,6 +406,27 @@ public JsonNode getJobMetricsData(String jobId, String verticeId, String metrics + FlinkRestAPIConstant.METRICS + "?get=" + URLEncodeUtil.encode(metrics)); } + public JsonNode getJobVerticeMetrics(String jobId, String verticeId) { + JsonNode metrics = this.getJobMetricsItems(jobId, verticeId); + StringBuilder sb = new StringBuilder(); + for (JsonNode node : metrics) { + if (Asserts.isNull(node)) { + continue; + } + + final JsonNode id = node.get(ID); + if (Asserts.isNull(id)) { + continue; + } + + if (sb.length() > 0) { + sb.append(","); + } + sb.append(id.asText()); + } + return this.getJobMetricsData(jobId, verticeId, sb.toString()); + } + /** * GET backpressure */ @@ -417,6 +438,14 @@ public String getBackPressure(String jobId, String verticeId) { + FlinkRestAPIConstant.BACKPRESSURE); } + public JsonNode getBackPressureJson(String jobId, String verticeId) { + return get(FlinkRestAPIConstant.JOBS + + jobId + + FlinkRestAPIConstant.VERTICES + + verticeId + + FlinkRestAPIConstant.BACKPRESSURE); + } + /** * GET watermark */ From 034cc675fe591100f417cb2e53793868b575f1c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E8=B4=B5=E5=8F=91?= Date: Mon, 15 Sep 2025 14:56:12 +0800 Subject: [PATCH 2/5] =?UTF-8?q?[dinky-admin]=20=E6=B7=BB=E5=8A=A0Prometheu?= =?UTF-8?q?s=20exporter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dinky/controller/MetricController.java | 57 +++ .../dinky/job/handler/JobRefreshHandler.java | 49 +++ .../org/dinky/mapper/JobInstanceMapper.java | 6 + .../org/dinky/metric/base/MetricKeys.java | 35 ++ .../org/dinky/metric/base/MetricService.java | 365 ++++++++++++++++++ .../org/dinky/metric/base/MetricType.java | 37 ++ .../resources/mapper/JobInstanceMapper.xml | 12 + 7 files changed, 561 insertions(+) create mode 100644 dinky-admin/src/main/java/org/dinky/controller/MetricController.java create mode 100644 dinky-admin/src/main/java/org/dinky/metric/base/MetricKeys.java create mode 100644 dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java create mode 100644 dinky-admin/src/main/java/org/dinky/metric/base/MetricType.java diff --git a/dinky-admin/src/main/java/org/dinky/controller/MetricController.java b/dinky-admin/src/main/java/org/dinky/controller/MetricController.java new file mode 100644 index 0000000000..7a53d8e788 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/controller/MetricController.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.controller; + +import org.dinky.metric.PrometheusService; +import org.dinky.metric.base.MetricType; + +import java.util.List; + +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import cn.dev33.satoken.annotation.SaIgnore; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * MetricController + * + * @since 2025/9/4 11:32 + */ +@Slf4j +@Api(tags = "Metrric Controller") +@RestController +@RequestMapping("/metric") +@RequiredArgsConstructor +@SaIgnore +public class MetricController { + + private final PrometheusService prometheusService; + + @GetMapping(value = "/prometheus/{types}", produces = MediaType.TEXT_PLAIN_VALUE) + public String prometheus(@PathVariable List types) { + return prometheusService.retrieveAllFlinkJobMetrics(types); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java index 348c88cdf1..8ed2520e0c 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java @@ -226,6 +226,55 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave) } /** +<<<<<<< Updated upstream +======= + * In Flink operator mode, resource scaling triggers a job redeployment, which results in a new job ID. + * The system will update to the latest job ID accordingly. + * + * + * @param jobInfoDetail The job info detail. + */ + public static void checkAndRefreshJobId(JobInfoDetail jobInfoDetail) { + try { + if (!GatewayType.get(jobInfoDetail.getClusterInstance().getType()).isKubernetesApplicationMode()) { + return; + } + + List jobs = FlinkAPI.build( + jobInfoDetail.getClusterInstance().getJobManagerHost()) + .listJobs(); + if (jobs == null || jobs.isEmpty()) { + log.info( + "No running jobs found on task: {}", + jobInfoDetail.getClusterInstance().getJobManagerHost()); + return; + } + + JsonNode firstJob = jobs.stream().findFirst().orElse(jobs.get(0)); + String latestJobId = firstJob.get("jid").asText(); + String currentJobId = jobInfoDetail.getInstance().getJid(); + if (!latestJobId.equals(currentJobId)) { + JobInstance jobInstance = jobInfoDetail.getInstance(); + jobInstance.setJid(latestJobId); + jobInstanceService.updateById(jobInstance); + log.info( + "JobId for [{}] has been refreshed: {} -> {}", + jobInfoDetail.getInstance().getName(), + currentJobId, + latestJobId); + } else { + log.debug( + "JobId for [{}] is up to date: {}", + jobInfoDetail.getInstance().getName(), + currentJobId); + } + } catch (Exception e) { + log.warn("check and refresh jobId fail, {}", e.getMessage()); + } + } + + /** +>>>>>>> Stashed changes * Retrieves job history. * getJobStatusInformationFromFlinkRestAPI * diff --git a/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java b/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java index 55fd072a3e..5c5ebb87cf 100644 --- a/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java +++ b/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java @@ -49,6 +49,12 @@ public interface JobInstanceMapper extends SuperMapper { @InterceptorIgnore(tenantLine = "true") List listJobInstanceActive(); + @InterceptorIgnore(tenantLine = "true") + List listJobInstancesToRecheck(); + + @InterceptorIgnore(tenantLine = "true") + List listAllJobInstances(); + JobInstance getJobInstanceByTaskId(Integer id); @InterceptorIgnore(tenantLine = "true") diff --git a/dinky-admin/src/main/java/org/dinky/metric/base/MetricKeys.java b/dinky-admin/src/main/java/org/dinky/metric/base/MetricKeys.java new file mode 100644 index 0000000000..3cd050068d --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/metric/base/MetricKeys.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metric.base; + +public class MetricKeys { + + public static final String DINKY_FLINK_TASK_ID = "task_id"; + public static final String DINKY_FLINK_TASK_NAME = "task_name"; + public static final String DINKY_FLINK_TASK_STATUS = "task_status"; + + public static final String DINKY_FLINK_TASK_MANAGER_ID = "task_manager_id"; + + public static final String DINKY_FLINK_CLUSTER_NAME = "task_cluster"; + + public static final String DINKY_FLINK_TASK_VERTICE_ID = "task_vertice_id"; + + public static final String DINKY_FLINK_TASK_DEPLOY_STATUS = "task_deploy_status"; +} diff --git a/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java b/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java new file mode 100644 index 0000000000..01cd20d857 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java @@ -0,0 +1,365 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metric.base; + +import org.dinky.api.FlinkAPI; +import org.dinky.daemon.pool.ScheduleThreadPool; +import org.dinky.data.enums.JobStatus; +import org.dinky.data.model.Task; +import org.dinky.data.model.ext.JobInfoDetail; +import org.dinky.data.model.job.JobInstance; +import org.dinky.service.JobInstanceService; +import org.dinky.service.TaskService; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.support.PeriodicTrigger; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class MetricService { + + @Autowired + private JobInstanceService jobInstanceService; + + @Autowired + private TaskService taskService; + + @Autowired + private ScheduleThreadPool schedule; + + private HashMap metricCaches = new HashMap<>(); + + private boolean isScheduleStart = false; + + protected abstract T formatFlinkJobMetrics(List> metrics); + + public T retrieveAllFlinkJobMetrics(List types) { + if (this.metricCaches == null || this.metricCaches.isEmpty()) { + synchronized (this) { + if (!this.isScheduleStart) { + this.isScheduleStart = true; + schedule.addSchedule( + "retrieve.all.flink.job.metrics", + () -> this.runRetrieveAllFlinkJobMetrics(MetricType.STATUS), + new PeriodicTrigger(1, TimeUnit.MINUTES)); + schedule.addSchedule( + "retrieve.all.flink.job.metrics", + () -> this.runRetrieveAllFlinkJobMetrics(MetricType.JOBMANAGER), + new PeriodicTrigger(1, TimeUnit.MINUTES)); + schedule.addSchedule( + "retrieve.all.flink.job.metrics", + () -> this.runRetrieveAllFlinkJobMetrics(MetricType.TASKMANAGER), + new PeriodicTrigger(1, TimeUnit.MINUTES)); + schedule.addSchedule( + "retrieve.all.flink.job.metrics", + () -> this.runRetrieveAllFlinkJobMetrics(MetricType.VERTICES), + new PeriodicTrigger(2, TimeUnit.MINUTES)); + } + } + } + MetricType type = types == null || types.isEmpty() ? MetricType.STATUS : types.get(0); + return this.metricCaches.getOrDefault(type, null); + } + + private void runRetrieveAllFlinkJobMetrics(MetricType type) { + List jobInstances = jobInstanceService.listAllJobInstances(); + log.info("jobInstance count: {}", jobInstances.size()); + T allMetrics = this.runRetrieveAllFlinkJobMetricsWithJobList(jobInstances, type); + this.metricCaches.put(type, allMetrics); + } + + private T runRetrieveAllFlinkJobMetricsWithJobList(List jobInstances, MetricType type) { + List> metrics = new ArrayList<>(); + for (JobInstance jobInstance : jobInstances) { + long start = System.currentTimeMillis(); + try { + JobInfoDetail jobInfoDetail = jobInstanceService.getJobInfoDetail(jobInstance.getId()); + if (type == MetricType.STATUS) { + metrics.add(this.retrieveFlinkJobStatusMetricByJobInfoDetail(jobInfoDetail)); + } else if (jobInstance.getStatus().equals(JobStatus.RUNNING.toString())) { + switch (type) { + case JOBMANAGER: + metrics.addAll(this.retrieveTaskManagerMetricsByJobInfoDetail(jobInfoDetail)); + break; + case TASKMANAGER: + metrics.addAll(this.retrieveJobManagerMetricsByJobInfoDetail(jobInfoDetail)); + break; + case VERTICES: + metrics.addAll(this.retrieveJobVerticesMetricsByJobInfoDetail(jobInfoDetail)); + break; + } + } + } catch (Exception e) { + log.error("JobInstance {} failed: {}", jobInstance.getId(), e.getMessage(), e); + } finally { + long cost = System.currentTimeMillis() - start; + log.info("JobInstance {} metric type {} retrieval cost {} ms", jobInstance.getName(), type, cost); + } + } + return this.formatFlinkJobMetrics(metrics); + } + + private HashMap retrieveFlinkJobStatusMetricByJobInfoDetail(JobInfoDetail jobInfoDetail) { + HashMap metric = this.retrieveJobInfoMetrics(jobInfoDetail); + metric.put("name", MetricNames.DINKY_FLINK_TASK_IS_RUNNING); + metric.put("value", jobInfoDetail.getInstance().getStatus().equals(JobStatus.RUNNING.toString()) ? 1 : 0); + return metric; + } + + private List> retrieveTaskManagerMetricsByJobInfoDetail(JobInfoDetail jobInfoDetail) { + + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + List taskManagers = + Lists.newArrayList(flinkAPI.getTaskManagers().get("taskmanagers")); + return taskManagers.parallelStream() + .flatMap(taskManager -> { + List> metricList = new ArrayList(); + String taskManagerId = taskManager.get("id").asText(); + JsonNode taskManagerMetrics = flinkAPI.getTaskManagerMetrics(taskManagerId); + for (JsonNode metric : Lists.newArrayList(taskManagerMetrics)) { + HashMap other = new HashMap<>(Map.of( + MetricKeys.DINKY_FLINK_TASK_MANAGER_ID, + taskManagerId.substring( + jobInfoDetail.getInstance().getName().length() + 1))); + metricList.add(buildMetric(jobInfoDetail, metric, other)); + } + return metricList.stream(); + }) + .collect(Collectors.toList()); + } + + private List> retrieveJobManagerMetricsByJobInfoDetail(JobInfoDetail jobInfoDetail) { + List> metricList = new ArrayList(); + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + JsonNode jobManagerMetrics = flinkAPI.getJobManagerMetrics(); + for (JsonNode metric : Lists.newArrayList(jobManagerMetrics)) { + metricList.add(buildMetric(jobInfoDetail, metric)); + } + return metricList; + } + + private List> retrieveJobVerticesMetricsByJobInfoDetail(JobInfoDetail jobInfoDetail) { + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + String jid = jobInfoDetail.getInstance().getJid(); + return flinkAPI.getVertices(jid).parallelStream() + .flatMap(vertice -> { + List> metricList = new ArrayList<>(); + HashMap> whitelist = + this.getWhitelistMetricsFromJobVertices(jobInfoDetail, vertice); + if (!whitelist.isEmpty()) { + HashMap other = + new HashMap<>(Map.of(MetricKeys.DINKY_FLINK_TASK_VERTICE_ID, vertice)); + for (String name : whitelist.keySet()) { + List verticeMetrics = new ArrayList<>(); + for (List metricGroup : Lists.partition(whitelist.get(name), 15)) { + JsonNode groupMetrics = + flinkAPI.getJobMetricsData(jid, vertice, String.join(",", metricGroup)); + verticeMetrics.addAll(Lists.newArrayList(groupMetrics)); + } + long value = this.getVerticeMetricValueByMetricName(name, verticeMetrics); + metricList.add(buildMetric(jobInfoDetail, name, value, other)); + } + } + metricList.addAll( + this.retrieveJobVerticeBackPressureMetricsByJobInfoDetail(jobInfoDetail, vertice)); + return metricList.stream(); + }) + .collect(Collectors.toList()); + } + + private long getVerticeMetricValueByMetricName(String dinkyMetricName, List verticeMetrics) { + List values = new ArrayList<>(); + for (JsonNode metric : verticeMetrics) { + if (metric.has("value") && !metric.get("value").isNull()) { + values.add(metric.get("value").asLong()); + } + } + if (values.isEmpty()) { + return 0L; + } + switch (dinkyMetricName) { + case MetricNames.DINKY_FLINK_TASK_RECORDS_CONSUMED_RATE: + case MetricNames.DINKY_FLINK_TASK_BYTES_CONSUMED_RATE: + return (long) values.stream().mapToLong(Long::longValue).sum(); + + case MetricNames.DINKY_FLINK_TASK_RECORDS_LAG_MAX: + return values.stream().mapToLong(Long::longValue).max().orElse(0); + + case MetricNames.DINKY_FLINK_TASK_CURRENT_OFFSET: + case MetricNames.DINKY_FLINK_TASK_COMMITTED_OFFSET: + return values.stream().mapToLong(Long::longValue).max().orElse(0); + + case MetricNames.DINKY_FLINK_TASK_LAST_CHECKPOINT_DURATION: + case MetricNames.DINKY_FLINK_TASK_LAST_CHECKPOINT_SIZE: + return values.stream().mapToLong(Long::longValue).max().orElse(0); + + default: + return (long) values.stream().mapToLong(Long::longValue).sum(); + } + } + + private HashMap> getWhitelistMetricsFromJobVertices( + JobInfoDetail jobInfoDetail, String vertice) { + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + JsonNode metrics = + flinkAPI.getJobMetricsItems(jobInfoDetail.getInstance().getJid(), vertice); + + HashMap> whitelist = new HashMap<>(); + for (JsonNode metric : metrics) { + String id = metric.get("id").asText(); + MetricNames.FLINK_VERTICE_MERTICS_MAP.keySet().stream().forEach(m -> { + if (id.contains(m)) { + String name = MetricNames.FLINK_VERTICE_MERTICS_MAP.get(m); + if (MetricNames.HUDI_METRICS_LIST.stream().anyMatch(name::equals) && id.contains("hoodie")) { + name = name + "_HUDI"; + } + whitelist.computeIfAbsent(name, k -> new ArrayList<>()).add(id); + } + }); + } + return whitelist; + } + + private HashMap buildMetric( + JobInfoDetail jobInfoDetail, String name, long value, HashMap other) { + HashMap base = this.retrieveJobInfoMetrics(jobInfoDetail); + HashMap m = new HashMap<>(base); + m.put("name", name); + m.put("value", value); + if (other != null) { + m.putAll(other); + } + return m; + } + + private HashMap buildMetric( + JobInfoDetail jobInfoDetail, JsonNode metric, HashMap other) { + HashMap base = this.retrieveJobInfoMetrics(jobInfoDetail); + HashMap m = new HashMap<>(base); + String name = + "DINKY_FLINK_JOB_" + metric.get("id").asText().replace(".", "_").toUpperCase(); + long value = metric.get("value").asLong(); + m.put("name", name); + m.put("value", value); + if (other != null) { + m.putAll(other); + } + return m; + } + + private HashMap buildMetric(JobInfoDetail jobInfoDetail, JsonNode metric) { + return this.buildMetric(jobInfoDetail, metric, null); + } + + private List> retrieveJobVerticeBackPressureMetricsByJobInfoDetail( + JobInfoDetail jobInfoDetail, String vertice) { + FlinkAPI flinkAPI = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()); + JsonNode backPressure = + flinkAPI.getBackPressureJson(jobInfoDetail.getInstance().getJid(), vertice); + + int backpressureLevel = mapJobVerticeBackPressureLevel( + backPressure.get("backpressureLevel").asText()); + + List ratios = StreamSupport.stream(backPressure.get("subtasks").spliterator(), false) + .map(subtask -> subtask.get("ratio").asDouble()) + .collect(Collectors.toList()); + + double backpressureRateMin = + ratios.stream().mapToDouble(Double::doubleValue).min().orElse(0.0); + double backpressureRateMax = + ratios.stream().mapToDouble(Double::doubleValue).max().orElse(1.0); + + HashMap baseMetrics = this.retrieveJobInfoMetrics(jobInfoDetail); + + List> metricList = new ArrayList<>(); + Map metricsMap = Map.of( + MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_LEVEL, backpressureLevel, + MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_RATE_MAX, backpressureRateMax, + MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_RATE_MIN, backpressureRateMin); + + metricsMap.forEach((name, value) -> { + HashMap metric = new HashMap<>(baseMetrics); + metric.put("name", name); + metric.put("value", value); + metric.put(MetricKeys.DINKY_FLINK_TASK_VERTICE_ID, vertice); + metricList.add(metric); + }); + + return metricList; + } + + private int mapJobVerticeBackPressureLevel(String backpressureLevel) { + switch (backpressureLevel.toLowerCase()) { + case "ok": + return 1; + case "low": + return 2; + case "medium": + return 3; + case "high": + return 4; + default: + return 0; + } + } + + public HashMap retrieveJobInfoMetrics(JobInfoDetail jobInfoDetail) { + HashMap metrics = new HashMap(); + + if (taskService != null) { + Task task = taskService.getById(jobInfoDetail.getInstance().getTaskId()); + + if (task != null) { + metrics.put( + MetricKeys.DINKY_FLINK_TASK_STATUS, + jobInfoDetail.getInstance().getStatus()); + metrics.put(MetricKeys.DINKY_FLINK_TASK_DEPLOY_STATUS, task.getStep()); + metrics.put(MetricKeys.DINKY_FLINK_TASK_NAME, task.getName()); + } + } + + if (!metrics.containsKey(MetricKeys.DINKY_FLINK_TASK_NAME)) { + metrics.put( + MetricKeys.DINKY_FLINK_TASK_NAME, + jobInfoDetail.getInstance().getName()); + } + + metrics.put(MetricKeys.DINKY_FLINK_TASK_ID, jobInfoDetail.getInstance().getTaskId()); + + if (jobInfoDetail.getClusterConfiguration() != null) { + metrics.put( + MetricKeys.DINKY_FLINK_CLUSTER_NAME, + jobInfoDetail.getClusterConfiguration().getName()); + } + return metrics; + } +} diff --git a/dinky-admin/src/main/java/org/dinky/metric/base/MetricType.java b/dinky-admin/src/main/java/org/dinky/metric/base/MetricType.java new file mode 100644 index 0000000000..5774f2b7f8 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/metric/base/MetricType.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metric.base; + +public enum MetricType { + STATUS(0), + JOBMANAGER(1), + TASKMANAGER(2), + VERTICES(3); + + private final int value; + + MetricType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } +} diff --git a/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml b/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml index a5ee9ed2dd..977a0ac776 100644 --- a/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml +++ b/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml @@ -103,4 +103,16 @@ from dinky_job_instance where id = #{id} + + From 03c585f10f8cd40badace81dbd1930c106dcbeb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E8=B4=B5=E5=8F=91?= Date: Mon, 15 Sep 2025 16:28:06 +0800 Subject: [PATCH 3/5] [dinky-admin] add merge metrics --- .../dinky/controller/MetricController.java | 5 ++ .../dinky/job/handler/JobRefreshHandler.java | 49 ------------------- .../org/dinky/metric/PrometheusService.java | 5 ++ .../org/dinky/metric/base/MetricService.java | 25 ++++++---- 4 files changed, 25 insertions(+), 59 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/controller/MetricController.java b/dinky-admin/src/main/java/org/dinky/controller/MetricController.java index 7a53d8e788..ec82fcd3c1 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/MetricController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/MetricController.java @@ -50,6 +50,11 @@ public class MetricController { private final PrometheusService prometheusService; + @GetMapping(value = "/prometheus", produces = MediaType.TEXT_PLAIN_VALUE) + public String prometheus() { + return prometheusService.retrieveAllFlinkJobMetrics(null); + } + @GetMapping(value = "/prometheus/{types}", produces = MediaType.TEXT_PLAIN_VALUE) public String prometheus(@PathVariable List types) { return prometheusService.retrieveAllFlinkJobMetrics(types); diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java index 8ed2520e0c..348c88cdf1 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java @@ -226,55 +226,6 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave) } /** -<<<<<<< Updated upstream -======= - * In Flink operator mode, resource scaling triggers a job redeployment, which results in a new job ID. - * The system will update to the latest job ID accordingly. - * - * - * @param jobInfoDetail The job info detail. - */ - public static void checkAndRefreshJobId(JobInfoDetail jobInfoDetail) { - try { - if (!GatewayType.get(jobInfoDetail.getClusterInstance().getType()).isKubernetesApplicationMode()) { - return; - } - - List jobs = FlinkAPI.build( - jobInfoDetail.getClusterInstance().getJobManagerHost()) - .listJobs(); - if (jobs == null || jobs.isEmpty()) { - log.info( - "No running jobs found on task: {}", - jobInfoDetail.getClusterInstance().getJobManagerHost()); - return; - } - - JsonNode firstJob = jobs.stream().findFirst().orElse(jobs.get(0)); - String latestJobId = firstJob.get("jid").asText(); - String currentJobId = jobInfoDetail.getInstance().getJid(); - if (!latestJobId.equals(currentJobId)) { - JobInstance jobInstance = jobInfoDetail.getInstance(); - jobInstance.setJid(latestJobId); - jobInstanceService.updateById(jobInstance); - log.info( - "JobId for [{}] has been refreshed: {} -> {}", - jobInfoDetail.getInstance().getName(), - currentJobId, - latestJobId); - } else { - log.debug( - "JobId for [{}] is up to date: {}", - jobInfoDetail.getInstance().getName(), - currentJobId); - } - } catch (Exception e) { - log.warn("check and refresh jobId fail, {}", e.getMessage()); - } - } - - /** ->>>>>>> Stashed changes * Retrieves job history. * getJobStatusInformationFromFlinkRestAPI * diff --git a/dinky-admin/src/main/java/org/dinky/metric/PrometheusService.java b/dinky-admin/src/main/java/org/dinky/metric/PrometheusService.java index d0c889696d..431a7c2c60 100644 --- a/dinky-admin/src/main/java/org/dinky/metric/PrometheusService.java +++ b/dinky-admin/src/main/java/org/dinky/metric/PrometheusService.java @@ -66,4 +66,9 @@ protected String formatFlinkJobMetrics(List> metrics) { return sb.toString(); } + + @Override + protected String mergeFlinkJobMetrics(List metricGroups) { + return String.join("\n", metricGroups); + } } diff --git a/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java b/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java index 01cd20d857..ccd645a5bd 100644 --- a/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java +++ b/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java @@ -29,6 +29,7 @@ import org.dinky.service.TaskService; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,13 +63,15 @@ public abstract class MetricService { protected abstract T formatFlinkJobMetrics(List> metrics); + protected abstract T mergeFlinkJobMetrics(List metricGroups); + public T retrieveAllFlinkJobMetrics(List types) { - if (this.metricCaches == null || this.metricCaches.isEmpty()) { + if (!this.isScheduleStart) { synchronized (this) { if (!this.isScheduleStart) { this.isScheduleStart = true; schedule.addSchedule( - "retrieve.all.flink.job.metrics", + "retrieve.all.flink.status.metrics", () -> this.runRetrieveAllFlinkJobMetrics(MetricType.STATUS), new PeriodicTrigger(1, TimeUnit.MINUTES)); schedule.addSchedule( @@ -76,18 +79,24 @@ public T retrieveAllFlinkJobMetrics(List types) { () -> this.runRetrieveAllFlinkJobMetrics(MetricType.JOBMANAGER), new PeriodicTrigger(1, TimeUnit.MINUTES)); schedule.addSchedule( - "retrieve.all.flink.job.metrics", + "retrieve.all.flink.task.metrics", () -> this.runRetrieveAllFlinkJobMetrics(MetricType.TASKMANAGER), new PeriodicTrigger(1, TimeUnit.MINUTES)); schedule.addSchedule( - "retrieve.all.flink.job.metrics", + "retrieve.all.flink.vertices.metrics", () -> this.runRetrieveAllFlinkJobMetrics(MetricType.VERTICES), new PeriodicTrigger(2, TimeUnit.MINUTES)); } } } - MetricType type = types == null || types.isEmpty() ? MetricType.STATUS : types.get(0); - return this.metricCaches.getOrDefault(type, null); + List _types = types == null || types.isEmpty() + ? Arrays.asList(MetricType.STATUS, MetricType.JOBMANAGER, MetricType.TASKMANAGER, MetricType.VERTICES) + : types; + List metricGroups = this.metricCaches.entrySet().stream() + .filter(e -> _types.contains(e.getKey())) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + return mergeFlinkJobMetrics(metricGroups); } private void runRetrieveAllFlinkJobMetrics(MetricType type) { @@ -100,7 +109,6 @@ private void runRetrieveAllFlinkJobMetrics(MetricType type) { private T runRetrieveAllFlinkJobMetricsWithJobList(List jobInstances, MetricType type) { List> metrics = new ArrayList<>(); for (JobInstance jobInstance : jobInstances) { - long start = System.currentTimeMillis(); try { JobInfoDetail jobInfoDetail = jobInstanceService.getJobInfoDetail(jobInstance.getId()); if (type == MetricType.STATUS) { @@ -120,9 +128,6 @@ private T runRetrieveAllFlinkJobMetricsWithJobList(List jobInstance } } catch (Exception e) { log.error("JobInstance {} failed: {}", jobInstance.getId(), e.getMessage(), e); - } finally { - long cost = System.currentTimeMillis() - start; - log.info("JobInstance {} metric type {} retrieval cost {} ms", jobInstance.getName(), type, cost); } } return this.formatFlinkJobMetrics(metrics); From 70024bdc1b22290ae59798a8b85d21101fca3465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E8=B4=B5=E5=8F=91?= Date: Mon, 15 Sep 2025 16:32:12 +0800 Subject: [PATCH 4/5] [dinky-admin] Add support for exporting Prometheus monitoring data --- .../src/main/java/org/dinky/mapper/JobInstanceMapper.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java b/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java index 5c5ebb87cf..343156193c 100644 --- a/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java +++ b/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java @@ -49,9 +49,6 @@ public interface JobInstanceMapper extends SuperMapper { @InterceptorIgnore(tenantLine = "true") List listJobInstanceActive(); - @InterceptorIgnore(tenantLine = "true") - List listJobInstancesToRecheck(); - @InterceptorIgnore(tenantLine = "true") List listAllJobInstances(); From b1bab52c27b98b888523476a671856175801c5af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E8=B4=B5=E5=8F=91?= Date: Mon, 15 Sep 2025 18:01:37 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E5=8E=BB=E6=8E=89Map.of?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/dinky/metric/base/MetricService.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java b/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java index ccd645a5bd..3d3739c98c 100644 --- a/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java +++ b/dinky-admin/src/main/java/org/dinky/metric/base/MetricService.java @@ -151,10 +151,11 @@ private List> retrieveTaskManagerMetricsByJobInfoDetail( String taskManagerId = taskManager.get("id").asText(); JsonNode taskManagerMetrics = flinkAPI.getTaskManagerMetrics(taskManagerId); for (JsonNode metric : Lists.newArrayList(taskManagerMetrics)) { - HashMap other = new HashMap<>(Map.of( + HashMap other = new HashMap<>(); + other.put( MetricKeys.DINKY_FLINK_TASK_MANAGER_ID, taskManagerId.substring( - jobInfoDetail.getInstance().getName().length() + 1))); + jobInfoDetail.getInstance().getName().length() + 1)); metricList.add(buildMetric(jobInfoDetail, metric, other)); } return metricList.stream(); @@ -181,8 +182,8 @@ private List> retrieveJobVerticesMetricsByJobInfoDetail( HashMap> whitelist = this.getWhitelistMetricsFromJobVertices(jobInfoDetail, vertice); if (!whitelist.isEmpty()) { - HashMap other = - new HashMap<>(Map.of(MetricKeys.DINKY_FLINK_TASK_VERTICE_ID, vertice)); + HashMap other = new HashMap<>(); + other.put(MetricKeys.DINKY_FLINK_TASK_VERTICE_ID, vertice); for (String name : whitelist.keySet()) { List verticeMetrics = new ArrayList<>(); for (List metricGroup : Lists.partition(whitelist.get(name), 15)) { @@ -306,10 +307,10 @@ private List> retrieveJobVerticeBackPressureMetricsByJob HashMap baseMetrics = this.retrieveJobInfoMetrics(jobInfoDetail); List> metricList = new ArrayList<>(); - Map metricsMap = Map.of( - MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_LEVEL, backpressureLevel, - MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_RATE_MAX, backpressureRateMax, - MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_RATE_MIN, backpressureRateMin); + Map metricsMap = new HashMap<>(); + metricsMap.put(MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_LEVEL, backpressureLevel); + metricsMap.put(MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_RATE_MAX, backpressureRateMax); + metricsMap.put(MetricNames.DINKY_FLINK_TASK_BACKPRESSURE_RATE_MIN, backpressureRateMin); metricsMap.forEach((name, value) -> { HashMap metric = new HashMap<>(baseMetrics);