Skip to content

Commit

Permalink
optimize_websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh committed Jan 26, 2025
1 parent 170b1cf commit 0bc3c82
Show file tree
Hide file tree
Showing 29 changed files with 367 additions and 335 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,26 @@ public Executor scheduleRefreshMonitorDataExecutor() {
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}

/**
*websocket event thread executor
* @return executor
*/
@Bean
public Executor wsSendExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 核心线程数
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
// 最大线程数
threadPoolTaskExecutor.setMaxPoolSize(200);
// 配置队列大小
threadPoolTaskExecutor.setQueueCapacity(1000);
// 配置线程池前缀
threadPoolTaskExecutor.setThreadNamePrefix("ws-send-");
// 阻塞策略
threadPoolTaskExecutor.setRejectedExecutionHandler(new BlockPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package org.dinky.configure;

import java.util.List;
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;
import org.dinky.ws.handler.WsMessageEventHandler;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.web.embedded.undertow.UndertowBuilderCustomizer;
import org.springframework.boot.web.embedded.undertow.UndertowDeploymentInfoCustomizer;
Expand All @@ -29,8 +32,12 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import javax.annotation.PostConstruct;

@Configuration
@AllArgsConstructor
public class WebSocketConfiguration {
private final List<WsMessageEventHandler> wsMessageEventHandlerList;

@Bean
public ServerEndpointExporter serverEndpointExporter() {
Expand All @@ -48,4 +55,11 @@ UndertowServletWebServerFactory undertowServletWebServerFactory(
.addAll(builderCustomizers.orderedStream().collect(Collectors.toList()));
return factory;
}

@PostConstruct
public void init() {
wsMessageEventHandlerList.forEach(WsMessageEventHandler::run);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.dinky.context;

import static org.dinky.ws.GlobalWebSocket.sendTopic;

import cn.hutool.core.map.MapUtil;
import cn.hutool.extra.spring.SpringUtil;
import org.dinky.aop.ProcessAspect;
import org.dinky.data.constant.DirConstant;
import org.dinky.data.enums.ProcessStatus;
Expand All @@ -31,7 +31,6 @@
import org.dinky.data.model.ProcessEntity;
import org.dinky.data.model.ProcessStepEntity;
import org.dinky.utils.LogUtil;
import org.dinky.ws.GlobalWebSocketTopic;

import org.apache.http.util.TextUtils;

Expand All @@ -41,18 +40,17 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

import org.dinky.ws.handler.ProcessConsole;
import org.slf4j.MDC;

import com.alibaba.fastjson2.JSONObject;

import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.StrFormatter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -69,17 +67,17 @@ public static ConsoleContextHolder getInstances() {
return instance;
}

private final Map<String, ProcessEntity> logPross = new ConcurrentHashMap<>();
private final Map<String, ProcessEntity> logProcess = new ConcurrentHashMap<>();

/**
* Get a list of all processes
*/
public List<ProcessEntity> list() {
return new ArrayList<>(logPross.values());
return new ArrayList<>(logProcess.values());
}

public synchronized ProcessEntity killProcess(String processName) {
ProcessEntity process = logPross.get(processName);
ProcessEntity process = logProcess.get(processName);
if (process == null) {
return getProcess(processName);
}
Expand All @@ -98,8 +96,8 @@ public synchronized ProcessEntity killProcess(String processName) {
}

public ProcessEntity getProcess(String processName) {
if (logPross.containsKey(processName)) {
return logPross.get(processName);
if (logProcess.containsKey(processName)) {
return logProcess.get(processName);
}
try {
String filePath = String.format("%s/log/%s.json", DirConstant.getTempRootDir(), processName);
Expand Down Expand Up @@ -133,11 +131,11 @@ public boolean clearProcessLog(String processName) {
* @throws BusException Throws an exception if the process does not exist
*/
public void appendLog(String processName, String stepPid, String logLine, boolean recordGlobal) {
if (!logPross.containsKey(processName)) {
if (!logProcess.containsKey(processName)) {
log.debug("Process {} does not exist, This log was abandoned", processName);
return;
}
ProcessEntity process = logPross.get(processName);
ProcessEntity process = logProcess.get(processName);
if (recordGlobal) {
process.appendLog(logLine);
}
Expand All @@ -151,11 +149,9 @@ public void appendLog(String processName, String stepPid, String logLine, boolea
}
process.setLastUpdateStep(stepNode);
}
CompletableFuture.runAsync(() -> {
sendTopic(
GlobalWebSocketTopic.PROCESS_CONSOLE,
MapUtil.<String, Object>builder(processName, process).build());
});
// send ws event
Map<String, Object> data = MapUtil.<String,Object>builder(processName, process).build();
SpringUtil.getBean(ProcessConsole.class).sendData(data);
}

/**
Expand All @@ -166,7 +162,7 @@ public void appendLog(String processName, String stepPid, String logLine, boolea
* @throws RuntimeException Throws an exception if the process already exists
*/
public synchronized void registerProcess(ProcessType type, String processName) throws RuntimeException {
if (logPross.containsKey(processName)) {
if (logProcess.containsKey(processName)) {
throw new BusException(Status.PROCESS_REGISTER_EXITS);
}
ProcessEntity entity = ProcessEntity.builder()
Expand All @@ -179,7 +175,7 @@ public synchronized void registerProcess(ProcessType type, String processName) t
.children(new CopyOnWriteArrayList<>())
.threadId(Thread.currentThread().getId())
.build();
logPross.put(processName, entity);
logProcess.put(processName, entity);
appendLog(processName, null, "Start Process:" + processName, true);
}

Expand All @@ -203,11 +199,11 @@ public ProcessStepEntity registerProcessStep(ProcessStepType type, String proces
.children(new CopyOnWriteArrayList<>())
.build();

if (!logPross.containsKey(processName)) {
if (!logProcess.containsKey(processName)) {
log.error(StrFormatter.format("Process {} does not exist", type));
return processStepEntity;
}
ProcessEntity process = logPross.get(processName);
ProcessEntity process = logProcess.get(processName);
process.setStatus(ProcessStatus.RUNNING);
if (TextUtils.isEmpty(parentStepPid)) {
// parentStep为空表示为顶级节点
Expand All @@ -231,7 +227,7 @@ public ProcessStepEntity registerProcessStep(ProcessStepType type, String proces
* @param e exception object, optional
*/
public synchronized void finishedProcess(String processName, ProcessStatus status, Throwable e) {
ProcessEntity process = logPross.get(processName);
ProcessEntity process = logProcess.get(processName);
try {
process.setStatus(status);
process.setEndTime(LocalDateTime.now());
Expand All @@ -254,7 +250,7 @@ public synchronized void finishedProcess(String processName, ProcessStatus statu
appendLog(processName, null, LogUtil.getError(ex.getCause()), true);
log.error("finishedProcess error", ex);
} finally {
logPross.remove(processName);
logProcess.remove(processName);
}
}

Expand All @@ -267,7 +263,7 @@ public synchronized void finishedProcess(String processName, ProcessStatus statu
* @param e exception object, optional
*/
public void finishedStep(String processName, ProcessStepEntity step, ProcessStatus status, Exception e) {
if (!logPross.containsKey(processName)) {
if (!logProcess.containsKey(processName)) {
return;
}
step.setStatus(status);
Expand All @@ -291,7 +287,7 @@ private ProcessStepEntity getStepNode(String stepPid, CopyOnWriteArrayList<Proce
String errorStr = StrFormatter.format(
"Get Parent Node Failed, This is most likely a Dinky bug, "
+ "please report the following information back to the community:\nProcess:{},\nstep:{},\nprocessNam:{}",
JSONObject.toJSONString(logPross),
JSONObject.toJSONString(logProcess),
stepPid,
MDC.get(ProcessAspect.PROCESS_NAME));
log.debug(errorStr);
Expand All @@ -316,6 +312,6 @@ private ProcessStepEntity findStepNode(String stepPid, CopyOnWriteArrayList<Proc
}

private CopyOnWriteArrayList<ProcessStepEntity> getStepsMap(String processName) {
return logPross.get(processName).getChildren();
return logProcess.get(processName).getChildren();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
package org.dinky.context;

import static org.dinky.data.constant.MonitorTableConstant.JOB_ID;
import static org.dinky.ws.GlobalWebSocket.sendTopic;

import cn.hutool.extra.spring.SpringUtil;
import org.dinky.data.constant.MonitorTableConstant;
import org.dinky.data.vo.MetricsVO;
import org.dinky.utils.SqliteUtil;
import org.dinky.ws.GlobalWebSocketTopic;

import java.sql.SQLException;
import java.util.ArrayList;
Expand All @@ -34,20 +33,16 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import cn.hutool.core.map.MapUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.dinky.ws.handler.Metrics;
import org.dinky.ws.handler.ProcessConsole;

/**
* The MetricsContextHolder class is used to manage the metric context,
Expand All @@ -70,32 +65,6 @@ public class MetricsContextHolder {
SqliteUtil.INSTANCE.createTable(MonitorTableConstant.DINKY_METRICS, sql);
}

// Create a ThreadFactory with custom naming
ThreadFactory namedThreadFactory =
new ThreadFactoryBuilder().setNameFormat("metrics-send-thread-%d").build();

// Create a custom ThreadPoolExecutor
ExecutorService pool = new ThreadPoolExecutor(
5, // Core pool size
10, // Maximum pool size, allows the pool to expand as needed
60L, // Keep alive time for idle threads
TimeUnit.SECONDS, // Unit of keep alive time
new LinkedBlockingQueue<>(10), // Use a larger queue to hold excess tasks
namedThreadFactory);

public void sendAsync(String key, MetricsVO o) {
Object content = o.getContent();
if (content == null
|| (content instanceof ConcurrentHashMap && ((ConcurrentHashMap<?, ?>) content).isEmpty())) {
return; // Return early to avoid unnecessary operations
}
pool.execute(() -> {
Map<String, Object> result =
MapUtil.<String, Object>builder().put(key, o).build();
sendTopic(GlobalWebSocketTopic.METRICS, result); // Ensure only successfully added metrics are sent
});
}

public void saveToSqlite(String key, MetricsVO o) {
Object content = o.getContent();
if (content == null
Expand All @@ -119,10 +88,12 @@ public void saveToSqlite(String key, MetricsVO o) {
}
metricsVOS.clear();
}
Map<String, Object> result =
Map<String, Object> data =
MapUtil.<String, Object>builder().put(key, o).build();
;
sendTopic(GlobalWebSocketTopic.METRICS, result);

// send ws event
SpringUtil.getBean(ProcessConsole.class).sendData(data);

}

public List<List<String>> convertMetricsVOsToStringList(List<MetricsVO> metricsVOS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@
import lombok.Data;

@Data
public class SseDataVo {
public class WsDataVo {
private String sessionKey;
private String topic;
private Object data;
private GlobalWebSocket.RequestDTO.EventType type;

public SseDataVo(String sessionKey, String topic, Object data) {
public WsDataVo(String sessionKey, String topic, Object data) {
this.sessionKey = sessionKey;
this.topic = topic;
this.data = data;
}

public SseDataVo(String sessionKey, GlobalWebSocket.RequestDTO.EventType type) {
public WsDataVo(String sessionKey, GlobalWebSocket.RequestDTO.EventType type) {
this.sessionKey = sessionKey;
this.type = type;
}
Expand Down
Loading

0 comments on commit 0bc3c82

Please sign in to comment.