From 7587352a9fb3da69c0fd8963581f778885e0863c Mon Sep 17 00:00:00 2001 From: "mingxing.ai" Date: Sat, 2 Sep 2023 09:49:11 +0800 Subject: [PATCH] init --- .../auth/service/ClientLoginService.java | 9 +++++- .../handler/SocketIOServerHandler.java | 7 +++-- .../common/websocket/message/Message.java | 18 ------------ .../websocket/message/RegisterMessage.java | 29 +++++++++++++++++++ .../session/model/SocketIOSession.java | 5 ++++ .../crawler/ProductPriceCrawlerJob.java | 4 ++- .../core/helper/ProductPriceHelper.java | 8 +++-- .../websocket/WebSocketMessageSender.java | 8 ++--- 8 files changed, 60 insertions(+), 28 deletions(-) delete mode 100644 ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/message/Message.java create mode 100644 ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/message/RegisterMessage.java diff --git a/ruoyi-auth/src/main/java/com/ruoyi/auth/service/ClientLoginService.java b/ruoyi-auth/src/main/java/com/ruoyi/auth/service/ClientLoginService.java index 0f93c9193..81420c6f0 100644 --- a/ruoyi-auth/src/main/java/com/ruoyi/auth/service/ClientLoginService.java +++ b/ruoyi-auth/src/main/java/com/ruoyi/auth/service/ClientLoginService.java @@ -54,7 +54,14 @@ public LoginUser login(String username, String password) { } public void register(String username, String password) { - + if (StringUtils.isAnyBlank(username, password)) + { + throw new ServiceException("username/password is not blank"); + } + ClientUser clientUser = new ClientUser(); + clientUser.setUserName(username); + clientUser.setPassword(password); + remoteClientUserService.registerUserInfo(clientUser, SecurityConstants.INNER); } } diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/handler/SocketIOServerHandler.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/handler/SocketIOServerHandler.java index 9801d44c5..e3a945269 100644 --- a/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/handler/SocketIOServerHandler.java +++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/handler/SocketIOServerHandler.java @@ -6,6 +6,7 @@ import com.corundumstudio.socketio.annotation.OnDisconnect; import com.corundumstudio.socketio.annotation.OnEvent; import com.ruoyi.common.core.constant.TokenConstants; +import com.ruoyi.common.websocket.message.RegisterMessage; import com.ruoyi.common.websocket.session.SocketIOSessionPool; import com.ruoyi.common.websocket.session.model.SocketIOSession; import com.sun.nio.sctp.MessageInfo; @@ -48,12 +49,14 @@ public void onHeartbeat(SocketIOClient socketIOClient, AckRequest ackRequest) { } @OnEvent("registry") - public void onRegistry(SocketIOClient socketIOClient, String registry) { + public void onRegistry(SocketIOClient socketIOClient, RegisterMessage message) { + log.info("接收到消息注册:{}", message); SocketIOSession socketIOSession = sessionPool.getSession(socketIOClient.getSessionId().toString()); if (socketIOSession == null) { return; } - socketIOSession.setTopic(registry); + socketIOSession.setTopic(message.getProduct()); + socketIOSession.setInterval(message.getInterval()); socketIOSession.setHeartbeatTime(System.currentTimeMillis()); socketIOClient.sendEvent("message", "ok"); } diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/message/Message.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/message/Message.java deleted file mode 100644 index a30a0c608..000000000 --- a/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/message/Message.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.ruoyi.common.websocket.message; - -import lombok.Builder; -import lombok.Data; -import lombok.ToString; - -import java.io.Serializable; - -@Data -@Builder -@ToString -public class Message implements Serializable { - - private String type; - - private T body; - -} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/message/RegisterMessage.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/message/RegisterMessage.java new file mode 100644 index 000000000..eb113ddde --- /dev/null +++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/message/RegisterMessage.java @@ -0,0 +1,29 @@ +package com.ruoyi.common.websocket.message; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; + +import java.io.Serializable; + +@Data +@ToString +@NoArgsConstructor +public class RegisterMessage implements Serializable { + + private String type; + + private String product; + + private String interval = "1m"; + + @JsonCreator + public static RegisterMessage create(String json) throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(json, RegisterMessage.class); + } + +} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/session/model/SocketIOSession.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/session/model/SocketIOSession.java index 1c0e35a17..59a5251bb 100644 --- a/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/session/model/SocketIOSession.java +++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/session/model/SocketIOSession.java @@ -31,6 +31,11 @@ public class SocketIOSession { */ private String topic; + /** + * 时间间隔 + */ + private String interval; + /** * 会话 */ diff --git a/ruoyi-modules/ruoyi-market/src/main/java/com/ruoyi/market/crawler/ProductPriceCrawlerJob.java b/ruoyi-modules/ruoyi-market/src/main/java/com/ruoyi/market/crawler/ProductPriceCrawlerJob.java index 693df1fb8..226216403 100644 --- a/ruoyi-modules/ruoyi-market/src/main/java/com/ruoyi/market/crawler/ProductPriceCrawlerJob.java +++ b/ruoyi-modules/ruoyi-market/src/main/java/com/ruoyi/market/crawler/ProductPriceCrawlerJob.java @@ -93,7 +93,9 @@ private void doCrawler(String productCode) { return; } boolean hasPriceChange = productPriceHelper.doRefreshPrice(productCode, new BigDecimal(currentPrice), new Date((System.currentTimeMillis() / 60000) * 60000)); - log.info("产品价格波动:{},准备发送通知,当前会话数量{}", productCode, sessionPool.sessionCount()); + if (log.isDebugEnabled()) { + log.info("产品价格波动:{},准备发送通知,当前会话数量{}", productCode, sessionPool.sessionCount()); + } if (!hasPriceChange || sessionPool.sessionCount() <= 0) { return; } diff --git a/ruoyi-modules/ruoyi-market/src/main/java/com/ruoyi/market/crawler/core/helper/ProductPriceHelper.java b/ruoyi-modules/ruoyi-market/src/main/java/com/ruoyi/market/crawler/core/helper/ProductPriceHelper.java index 3979729bf..55ec7b4ee 100644 --- a/ruoyi-modules/ruoyi-market/src/main/java/com/ruoyi/market/crawler/core/helper/ProductPriceHelper.java +++ b/ruoyi-modules/ruoyi-market/src/main/java/com/ruoyi/market/crawler/core/helper/ProductPriceHelper.java @@ -93,7 +93,9 @@ private void doRefreshPrice(ProductPriceCache productPriceCache, MarketPriceType } //log.info("时间差值:{}, 时间间隔:{}", productPriceCache.getCurrentTime().getTime() - productPrice.getTimestamp(), priceType.getInterval()); if (productPriceCache.getCurrentTime().getTime() - productPrice.getTimestamp() >= priceType.getInterval()) { - log.info("跨时间段,将数据重新塞回列表:{}", productPrice); + if (log.isDebugEnabled()) { + log.debug("跨时间段,将数据重新塞回列表:{}", productPrice); + } productPrice.setClose(productPriceCache.getCurrentPrice()); productPrice.setHigh(productPrice.getHigh().compareTo(productPriceCache.getCurrentPrice()) >= 0 ? productPrice.getHigh() : productPriceCache.getCurrentPrice()); productPrice.setLow(productPrice.getLow().compareTo(productPriceCache.getCurrentPrice()) <= 0 ? productPrice.getLow() : productPriceCache.getCurrentPrice()); @@ -111,7 +113,9 @@ private void doRefreshPrice(ProductPriceCache productPriceCache, MarketPriceType productPrice.setLow(productPrice.getLow().compareTo(productPriceCache.getCurrentPrice()) <= 0 ? productPrice.getLow() : productPriceCache.getCurrentPrice()); productPrice.setClose(productPriceCache.getCurrentPrice()); } - log.info("刷新产品价格信息:{}", productPrice); + if (log.isDebugEnabled()) { + log.debug("刷新产品价格信息:{}", productPrice); + } redisService.pushLastObject(productPriceKey, productPrice); redisService.expire(productPriceKey, 1, TimeUnit.DAYS); } diff --git a/ruoyi-modules/ruoyi-market/src/main/java/com/ruoyi/market/websocket/WebSocketMessageSender.java b/ruoyi-modules/ruoyi-market/src/main/java/com/ruoyi/market/websocket/WebSocketMessageSender.java index ae501de43..61ed4b0c0 100644 --- a/ruoyi-modules/ruoyi-market/src/main/java/com/ruoyi/market/websocket/WebSocketMessageSender.java +++ b/ruoyi-modules/ruoyi-market/src/main/java/com/ruoyi/market/websocket/WebSocketMessageSender.java @@ -33,10 +33,10 @@ public class WebSocketMessageSender { private IProductInfoService productInfoService; public void send(SocketIOSession session, String productCode) { - if (StringUtils.isBlank(session.getTopic())) { + if (StringUtils.isBlank(session.getTopic()) || StringUtils.isBlank(session.getInterval())) { return; } - String productPriceKey = String.format(PRODUCT_PRICE_INFO_KEY, productCode, MarketPriceTypeEnum.MK_1M.getKey()); + String productPriceKey = String.format(PRODUCT_PRICE_INFO_KEY, productCode, session.getInterval()); ProductKLineCache productPrice = redisService.getLastObject(productPriceKey, ProductKLineCache.class); if (productPrice == null) { return; @@ -56,7 +56,7 @@ public void send(SocketIOSession session, String productCode) { message.setHigh(productPrice.getHigh()); message.setRange(productPriceCache.getRange()); if (session.getTopic().equals(productCode)) { - session.getClient().sendEvent("message", JSON.toJSONString(message)); + session.getClient().sendEvent("message", message); return; } List productInfos = productInfoService.selectProductInfoByCategory(session.getTopic()); @@ -65,7 +65,7 @@ public void send(SocketIOSession session, String productCode) { } boolean isRegister = productInfos.stream().map(ProductInfo::getProductCode).collect(Collectors.toSet()).contains(productCode); if (isRegister) { - session.getClient().sendEvent("message", JSON.toJSONString(message)); + session.getClient().sendEvent("message", message); } }