Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
xingxing-dd committed Sep 3, 2023
2 parents 6dc63b8 + 7587352 commit 425cf88
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,14 @@ public LoginUser login(String username, String password) {
}

public void register(String username, String password) {

if (StringUtils.isAnyBlank(username, password))
{
throw new ServiceException("username/password is not blank");
}
ClientUser clientUser = new ClientUser();
clientUser.setUserName(username);
clientUser.setPassword(password);
remoteClientUserService.registerUserInfo(clientUser, SecurityConstants.INNER);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.ruoyi.common.core.constant.TokenConstants;
import com.ruoyi.common.websocket.message.RegisterMessage;
import com.ruoyi.common.websocket.session.SocketIOSessionPool;
import com.ruoyi.common.websocket.session.model.SocketIOSession;
import com.sun.nio.sctp.MessageInfo;
Expand Down Expand Up @@ -48,12 +49,14 @@ public void onHeartbeat(SocketIOClient socketIOClient, AckRequest ackRequest) {
}

@OnEvent("registry")
public void onRegistry(SocketIOClient socketIOClient, String registry) {
public void onRegistry(SocketIOClient socketIOClient, RegisterMessage message) {
log.info("接收到消息注册:{}", message);
SocketIOSession socketIOSession = sessionPool.getSession(socketIOClient.getSessionId().toString());
if (socketIOSession == null) {
return;
}
socketIOSession.setTopic(registry);
socketIOSession.setTopic(message.getProduct());
socketIOSession.setInterval(message.getInterval());
socketIOSession.setHeartbeatTime(System.currentTimeMillis());
socketIOClient.sendEvent("message", "ok");
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.ruoyi.common.websocket.message;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.io.Serializable;

@Data
@ToString
@NoArgsConstructor
public class RegisterMessage implements Serializable {

private String type;

private String product;

private String interval = "1m";

@JsonCreator
public static RegisterMessage create(String json) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, RegisterMessage.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public class SocketIOSession {
*/
private String topic;

/**
* 时间间隔
*/
private String interval;

/**
* 会话
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ private void doCrawler(String productCode) {
return;
}
boolean hasPriceChange = productPriceHelper.doRefreshPrice(productCode, new BigDecimal(currentPrice), new Date((System.currentTimeMillis() / 60000) * 60000));
log.info("产品价格波动:{},准备发送通知,当前会话数量{}", productCode, sessionPool.sessionCount());
if (log.isDebugEnabled()) {
log.info("产品价格波动:{},准备发送通知,当前会话数量{}", productCode, sessionPool.sessionCount());
}
if (!hasPriceChange || sessionPool.sessionCount() <= 0) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ private void doRefreshPrice(ProductPriceCache productPriceCache, MarketPriceType
}
//log.info("时间差值:{}, 时间间隔:{}", productPriceCache.getCurrentTime().getTime() - productPrice.getTimestamp(), priceType.getInterval());
if (productPriceCache.getCurrentTime().getTime() - productPrice.getTimestamp() >= priceType.getInterval()) {
log.info("跨时间段,将数据重新塞回列表:{}", productPrice);
if (log.isDebugEnabled()) {
log.debug("跨时间段,将数据重新塞回列表:{}", productPrice);
}
productPrice.setClose(productPriceCache.getCurrentPrice());
productPrice.setHigh(productPrice.getHigh().compareTo(productPriceCache.getCurrentPrice()) >= 0 ? productPrice.getHigh() : productPriceCache.getCurrentPrice());
productPrice.setLow(productPrice.getLow().compareTo(productPriceCache.getCurrentPrice()) <= 0 ? productPrice.getLow() : productPriceCache.getCurrentPrice());
Expand All @@ -111,7 +113,9 @@ private void doRefreshPrice(ProductPriceCache productPriceCache, MarketPriceType
productPrice.setLow(productPrice.getLow().compareTo(productPriceCache.getCurrentPrice()) <= 0 ? productPrice.getLow() : productPriceCache.getCurrentPrice());
productPrice.setClose(productPriceCache.getCurrentPrice());
}
log.info("刷新产品价格信息:{}", productPrice);
if (log.isDebugEnabled()) {
log.debug("刷新产品价格信息:{}", productPrice);
}
redisService.pushLastObject(productPriceKey, productPrice);
redisService.expire(productPriceKey, 1, TimeUnit.DAYS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ public class WebSocketMessageSender {
private IProductInfoService productInfoService;

public void send(SocketIOSession session, String productCode) {
if (StringUtils.isBlank(session.getTopic())) {
if (StringUtils.isBlank(session.getTopic()) || StringUtils.isBlank(session.getInterval())) {
return;
}
String productPriceKey = String.format(PRODUCT_PRICE_INFO_KEY, productCode, MarketPriceTypeEnum.MK_1M.getKey());
String productPriceKey = String.format(PRODUCT_PRICE_INFO_KEY, productCode, session.getInterval());
ProductKLineCache productPrice = redisService.getLastObject(productPriceKey, ProductKLineCache.class);
if (productPrice == null) {
return;
Expand All @@ -56,7 +56,7 @@ public void send(SocketIOSession session, String productCode) {
message.setHigh(productPrice.getHigh());
message.setRange(productPriceCache.getRange());
if (session.getTopic().equals(productCode)) {
session.getClient().sendEvent("message", JSON.toJSONString(message));
session.getClient().sendEvent("message", message);
return;
}
List<ProductInfo> productInfos = productInfoService.selectProductInfoByCategory(session.getTopic());
Expand All @@ -65,7 +65,7 @@ public void send(SocketIOSession session, String productCode) {
}
boolean isRegister = productInfos.stream().map(ProductInfo::getProductCode).collect(Collectors.toSet()).contains(productCode);
if (isRegister) {
session.getClient().sendEvent("message", JSON.toJSONString(message));
session.getClient().sendEvent("message", message);
}
}

Expand Down

0 comments on commit 425cf88

Please sign in to comment.