Skip to content

Commit

Permalink
Merge pull request #105 from gitxiaofeng/master
Browse files Browse the repository at this point in the history
feature:增加服务端主动心跳逻辑
  • Loading branch information
gitxiaofeng authored Jul 15, 2021
2 parents 2c07bcf + 59c7c52 commit 6f0e429
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public CaseRoom(Long id) {
public Player createAndAddPlayer(Client client) {
Player p = super.createAndAddPlayer(client);
String caseContent = testCaseContent != null ? testCaseContent : testCase.getCaseContent();
p.getClient().sendMessage(caseContent);
p.getClient().sendMessage(CaseMessageType.EDITOR, caseContent);
LOGGER.info(Thread.currentThread().getName() + ": 新的用户加入成功,传输用例内容: " + testCaseContent);
return p;
}
Expand Down Expand Up @@ -51,6 +51,6 @@ protected void internalRemovePlayer(Player p) {
}

// 广播有用户离开
broadcastRoomMessage("当前用户数:" + players.size() + "。用例编辑者 " + p.getClient().getClientName() + " 离开");
broadcastRoomMessage(CaseMessageType.NOTIFY, "当前用户数:" + players.size() + "。用例编辑者 " + p.getClient().getClientName() + " 离开");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public Long getRecordId() {

public void close() {
LOGGER.info(Thread.currentThread().getName() + ": client " + this.getClientName() + " 准备退出。" + this.session.getId());
sendMessage(new String(CaseWsMessages.CLIENT_CLOSE.getMsg()));
sendMessage(CaseMessageType.CTRL, new String(CaseWsMessages.CLIENT_CLOSE.getMsg()));
}

public void sendMessage(String msg) {
if (!msg.contains(CaseWsMessages.PONG.getMsg())) {
public void sendMessage(CaseMessageType type, String msg) {
if (!type.equals(CaseMessageType.PING)) {
LOGGER.info(Thread.currentThread().getName() + ": 准备向 " + this.getClientName() + " 发送消息:" + msg);
}
synchronized (messagesToSend) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Player createAndAddPlayer(Client client) {
if (p.getRoom().players.size() <= 1) {
mergeRecoed(p.getClient().getRecordId(), caseContent);
}
p.getClient().sendMessage(testCaseContent);
p.getClient().sendMessage(CaseMessageType.EDITOR, testCaseContent);
LOGGER.info(Thread.currentThread().getName() + ": 新的用户加入成功,传输用例内容: " + testCaseContent);
return p;
}
Expand Down Expand Up @@ -97,7 +97,7 @@ protected void internalRemovePlayer(Player p) {
}

// 广播有用户离开
broadcastRoomMessage( "当前用户数:" + players.size() + "。用例执行者 " + p.getClient().getClientName() + " 离开");
broadcastRoomMessage(CaseMessageType.NOTIFY, "当前用户数:" + players.size() + "。用例执行者 " + p.getClient().getClientName() + " 离开");

}

Expand Down
36 changes: 27 additions & 9 deletions case-server/src/main/java/com/xiaoju/framework/handler/Room.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Player createAndAddPlayer(Client client) {
Player p = new Player(this, client);

// 通知消息
broadcastRoomMessage( "当前用户数: " + (players.size() + 1) + "。新用户是:" + client.getClientName());
broadcastRoomMessage(CaseMessageType.NOTIFY, "当前用户数: " + (players.size() + 1) + "。新用户是:" + client.getClientName());

players.add(p);
cs.put(client.getSession(), client);
Expand All @@ -120,7 +120,7 @@ public Player createAndAddPlayer(Client client) {

// 发送当前用户数
String content = String.valueOf(players.size());
p.sendRoomMessageSync("当前用户数:" + content);
p.sendRoomMessageSync(CaseMessageType.NOTIFY, "当前用户数:" + content);

return p;
}
Expand All @@ -147,9 +147,10 @@ protected void internalRemovePlayer(Player p) {
}

// 直接广播发送内容,不经过buffer池。适用于所有消息都是一致的场景。
protected void broadcastRoomMessage(String content) {
protected void broadcastRoomMessage(CaseMessageType type, String content) {

for (Player p : players) {
p.sendRoomMessageSync(content);
p.sendRoomMessageSync(type, content);
}
}

Expand Down Expand Up @@ -184,7 +185,7 @@ private void broadcastMessage(String msg) {
for (Player p : players) {
String s = String.valueOf(p.getLastReceivedMessageId())
+ "," + msgStr;
p.sendRoomMessageSync(s); // 直接发送,不放到buffer
p.sendRoomMessageSync(CaseMessageType.EDITOR, s); // 直接发送,不放到buffer
}
} else {
int seperateIndex = msg.indexOf('|');
Expand Down Expand Up @@ -227,7 +228,7 @@ private void broadcastTimerTick() {

caseMessages.clear();

p.sendRoomMessageSync(sb.toString());
p.sendRoomMessageSync(CaseMessageType.EDITOR, sb.toString());
}
}

Expand Down Expand Up @@ -293,6 +294,8 @@ public static final class Player {
private final Client client;
private final long enterTimeStamp;

private Integer pingCount;

// private final boolean isRecord;

/**
Expand All @@ -304,10 +307,19 @@ private List<String> getBufferedMessages() {
return bufferedMessages;
}

public boolean isPingNormal() {
return pingCount <= 2;
}

public void clearPingCount() {
this.pingCount = 0;
}

private Player(Room room, Client client) {
this.room = room;
this.client = client;
this.enterTimeStamp = System.currentTimeMillis();
this.pingCount = 0;
// isRecord = client.getRecordId();
}

Expand Down Expand Up @@ -356,10 +368,16 @@ public void handleCtrlMessage(String msg) {
* 发送room的消息
* @param content
*/
public void sendRoomMessageSync(String content) {
public void sendRoomMessageSync(CaseMessageType type, String content) {
Objects.requireNonNull(content);

client.sendMessage(content);
if (content.equals(CaseWsMessages.PING.getMsg())) {
this.pingCount ++;
if (!isPingNormal()) {
LOGGER.error("服务端ping客户端3次失败,当前用户连接有问题:" + this.getClient().getClientName());
throw new RuntimeException("心跳错误,客户端与服务端连接错误");
}
}
client.sendMessage(type, content);
}

public void sendRoomMessageAsync(String content) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,23 @@ public String toString() {
private static volatile Map<Long, Room> rooms = new ConcurrentHashMap<>();
private static final Object roomLock = new Object();

static {
Thread t = new Thread(() -> {
while(true) {
for (Room room : rooms.values()) {
room.broadcastRoomMessage(CaseMessageType.PING, CaseWsMessages.PING.getMsg());
}
try {
Thread.sleep(15000);
} catch (Exception e) {
LOGGER.error("ping thread sleep error.", e);
}
}
});
t.setDaemon(true);
t.start();
}

public static Room getRoom(boolean create, long id) {
if (create) {
// todo: 清除的逻辑放到线程定时任务中
Expand Down Expand Up @@ -110,7 +127,7 @@ public void run() {
}
LOGGER.info(Thread.currentThread().getName() + ": player " + client.getClientName() + " 加入: " + player);
} catch (IllegalStateException e) {
client.sendMessage(new String("0" + e.getMessage()));
client.sendMessage(CaseMessageType.NOTIFY, new String("0" + e.getMessage()));
client.close();
}
} catch (RuntimeException e) {
Expand Down Expand Up @@ -171,7 +188,13 @@ public void run() {
String messageContent = message.substring(1);
switch (messageType) {
case '0': // 处理ping/pong消息
room.cs.get(session).sendMessage(CaseWsMessages.PONG.getMsg());
if (messageContent.equals(CaseWsMessages.PING.getMsg())) {
room.cs.get(session).sendMessage(CaseMessageType.PING, CaseWsMessages.PONG.getMsg());
} else if (messageContent.equals(CaseWsMessages.PONG.getMsg())) {
player.clearPingCount();
} else {
LOGGER.error(Thread.currentThread().getName() + "ping pong 信息有误。消息是:" + message);
}
break;

case '1': // 处理编辑消息
Expand All @@ -189,22 +212,22 @@ public void run() {

if (messageContent.equals("lock")) { // lock消息
if (player.getRoom().getLock()) {
player.sendRoomMessageSync("2" + "failed" + "已经被人锁住了"); // 当前已经lock了,后续可以发送详细锁住人信息
player.sendRoomMessageSync(CaseMessageType.NOTIFY,"2" + "failed" + "已经被人锁住了"); // 当前已经lock了,后续可以发送详细锁住人信息
return;
} else {
player.getRoom().lock();
player.getRoom().setLocker(session.getId());
}
} else if(messageContent.equals("unlock")) {
if (!player.getRoom().getLock()) { // 当前已经unlock状态
player.sendRoomMessageSync("2" + "failed" + " 当前已经是解锁状态");
player.sendRoomMessageSync(CaseMessageType.NOTIFY,"2" + "failed" + " 当前已经是解锁状态");
return;
} else {
if (player.getRoom().getLocker().equals(session.getId())) { // 自己锁的
player.getRoom().unlock();
player.getRoom().setLocker("");
} else {// 其他人锁的
player.sendRoomMessageSync("2" + "failed" + "当前被其他人锁住了");
player.sendRoomMessageSync(CaseMessageType.NOTIFY,"2" + "failed" + "当前被其他人锁住了");
return;
}
}
Expand Down

0 comments on commit 6f0e429

Please sign in to comment.