From 70f24c67f26fe88c8bfeed8aeb8beccbfe894bed Mon Sep 17 00:00:00 2001 From: Egor Egorov Date: Tue, 20 Mar 2018 15:56:39 +1300 Subject: [PATCH 1/7] Removed reconnection handler class --- .../com/clusterws/ReconnectionHandler.java | 89 ------------------- 1 file changed, 89 deletions(-) delete mode 100644 src/main/java/com/clusterws/ReconnectionHandler.java diff --git a/src/main/java/com/clusterws/ReconnectionHandler.java b/src/main/java/com/clusterws/ReconnectionHandler.java deleted file mode 100644 index ab50a76..0000000 --- a/src/main/java/com/clusterws/ReconnectionHandler.java +++ /dev/null @@ -1,89 +0,0 @@ -package com.clusterws; - -import org.java_websocket.WebSocket; - -import java.util.ArrayList; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ThreadLocalRandom; - -class ReconnectionHandler { - private final boolean mAutoReconnect; - private final int mReconnectionIntervalMin; - private final int mReconnectionIntervalMax; - private final int mReconnectionAttempts; - private boolean mInReconnectionState; - private int mReconnectionsAttempted; - private Timer mReconnectionTimer; - private Timer mTimerOff; - private ClusterWS mSocket; - - ReconnectionHandler(Boolean autoReconnect, Integer reconnectionIntervalMin, Integer reconnectionIntervalMax, Integer reconnectionAttempts, ClusterWS socket) { - mAutoReconnect = autoReconnect != null ? autoReconnect : false; - mReconnectionIntervalMin = reconnectionIntervalMin != null ? reconnectionIntervalMin : 1000; - mReconnectionIntervalMax = reconnectionIntervalMax != null ? reconnectionIntervalMax : 5000; - mReconnectionAttempts = reconnectionAttempts != null ? reconnectionAttempts : 0; - mSocket = socket; - mInReconnectionState = false; - mReconnectionsAttempted = 0; - } - - void onOpen(){ - if (mReconnectionTimer != null) { - mReconnectionTimer.cancel(); - mReconnectionTimer = null; - } - - if (mTimerOff != null) { - mTimerOff.cancel(); - mTimerOff = null; - } - - mInReconnectionState = false; - mReconnectionsAttempted = 0; - List channels = mSocket.getChannels(); - - for (Channel channel : - channels) { - channel.subscribe(); - } - } - - void reconnect(){ - mInReconnectionState = true; - mReconnectionTimer = new Timer(); - mReconnectionTimer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - if (mSocket.getState() == WebSocket.READYSTATE.CLOSED) { - mReconnectionsAttempted++; - if (mReconnectionAttempts != 0 && mReconnectionsAttempted >= mReconnectionAttempts) { - cancel(); - mInReconnectionState = false; - } else { - if (mTimerOff != null) { - mTimerOff.cancel(); - } - mTimerOff = new Timer(); - int randomDelay = ThreadLocalRandom.current().nextInt(1, mReconnectionIntervalMax- mReconnectionIntervalMin + 1); - mTimerOff.schedule(new TimerTask() { - @Override - public void run() { - mSocket.connect(); - } - }, randomDelay); - } - } - } - }, 0, mReconnectionIntervalMin); - } - - boolean isInReconnectionState() { - return mInReconnectionState; - } - - boolean isAutoReconnect() { - return mAutoReconnect; - } -} From 7e9ba09dd7efe3151eb3091b8eaadd0d023387b7 Mon Sep 17 00:00:00 2001 From: Egor Egorov Date: Tue, 20 Mar 2018 15:58:05 +1300 Subject: [PATCH 2/7] Updated ClusterWS icon in Readme --- docs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/README.md b/docs/README.md index b477134..c8e5725 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,7 +2,7 @@
Build Scalable Node.js WebSocket Applications

- +

From 883ab7385329af91636d30713c36bdf63a248c50 Mon Sep 17 00:00:00 2001 From: Egor Egorov Date: Tue, 20 Mar 2018 15:58:58 +1300 Subject: [PATCH 3/7] Small refactoring --- src/main/java/com/clusterws/Channel.java | 12 +++++----- src/main/java/com/clusterws/Emitter.java | 16 ++++++------- .../com/clusterws/IClusterWSListener.java | 2 ++ .../java/com/clusterws/ISocketEvents.java | 4 ++++ .../java/com/clusterws/MessageHandler.java | 23 +++++++++---------- src/main/java/com/clusterws/PingHandler.java | 2 +- src/main/java/com/clusterws/Socket.java | 2 +- 7 files changed, 33 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/clusterws/Channel.java b/src/main/java/com/clusterws/Channel.java index 0242d83..6ab8efe 100644 --- a/src/main/java/com/clusterws/Channel.java +++ b/src/main/java/com/clusterws/Channel.java @@ -3,7 +3,7 @@ import java.util.List; public class Channel { - public interface IChannelListener{ + public interface IChannelListener { void onDataReceived(String channelName, Object data); } @@ -16,13 +16,13 @@ public Channel(String channelName, ClusterWS clusterWS) { mClusterWS = clusterWS; } - public Channel watch(IChannelListener listener){ + public Channel watch(IChannelListener listener) { mChannelListener = listener; return this; } - public Channel publish(Object data){ - mClusterWS.send(mChannelName,data,"publish"); + public Channel publish(Object data) { + mClusterWS.send(mChannelName, data, "publish"); return this; } @@ -42,7 +42,7 @@ void onMessage(Object data) { } } - void subscribe(){ - mClusterWS.send("subscribe",mChannelName,"system"); + void subscribe() { + mClusterWS.send("subscribe", mChannelName, "system"); } } diff --git a/src/main/java/com/clusterws/Emitter.java b/src/main/java/com/clusterws/Emitter.java index 74e3767..90f3ad6 100644 --- a/src/main/java/com/clusterws/Emitter.java +++ b/src/main/java/com/clusterws/Emitter.java @@ -3,28 +3,28 @@ import java.util.concurrent.ConcurrentHashMap; class Emitter { - private ConcurrentHashMap mEvents; + private ConcurrentHashMap mEvents; Emitter() { mEvents = new ConcurrentHashMap<>(); } - void addEventListener(String event, IEmitterListener listener){ - if (mEvents.containsKey(event)){ - mEvents.replace(event,listener); + void addEventListener(String event, IEmitterListener listener) { + if (mEvents.containsKey(event)) { + mEvents.replace(event, listener); } else { - mEvents.put(event,listener); + mEvents.put(event, listener); } } - void emit(String event, Object object){ + void emit(String event, Object object) { IEmitterListener listener = mEvents.get(event); - if (listener != null){ + if (listener != null) { listener.onDataReceived(object); } } - void removeAllEvents(){ + void removeAllEvents() { mEvents = new ConcurrentHashMap<>(); } } diff --git a/src/main/java/com/clusterws/IClusterWSListener.java b/src/main/java/com/clusterws/IClusterWSListener.java index dfade6c..988a639 100644 --- a/src/main/java/com/clusterws/IClusterWSListener.java +++ b/src/main/java/com/clusterws/IClusterWSListener.java @@ -2,6 +2,8 @@ public interface IClusterWSListener { void onConnected(); + void onError(Exception exception); + void onDisconnected(int code, String reason); } diff --git a/src/main/java/com/clusterws/ISocketEvents.java b/src/main/java/com/clusterws/ISocketEvents.java index 7fbd416..4f7e27f 100644 --- a/src/main/java/com/clusterws/ISocketEvents.java +++ b/src/main/java/com/clusterws/ISocketEvents.java @@ -4,9 +4,13 @@ public interface ISocketEvents { void onOpen(); + void onError(Exception exception); + void onClose(int code, String reason); + void onBinaryMessage(ByteBuffer bytes); + void onMessage(String message); } diff --git a/src/main/java/com/clusterws/MessageHandler.java b/src/main/java/com/clusterws/MessageHandler.java index 51b013c..44018d8 100644 --- a/src/main/java/com/clusterws/MessageHandler.java +++ b/src/main/java/com/clusterws/MessageHandler.java @@ -4,16 +4,15 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import java.util.ArrayList; import java.util.List; import java.util.TimerTask; class MessageHandler { - String messageEncode(String event, Object data, String type){ + String messageEncode(String event, Object data, String type) { JSONObject jsonObject = new JSONObject(); JSONArray jsonArray = new JSONArray(); - switch (type){ + switch (type) { case "publish": jsonArray.add("p"); jsonArray.add(event); @@ -27,18 +26,18 @@ String messageEncode(String event, Object data, String type){ jsonObject.put("#", jsonArray); return jsonObject.toJSONString(); case "system": - switch (event){ + switch (event) { case "subscribe": jsonArray.add("s"); jsonArray.add("s"); jsonArray.add(data); - jsonObject.put("#",jsonArray); + jsonObject.put("#", jsonArray); return jsonObject.toJSONString(); case "unsubscribe": jsonArray.add("s"); jsonArray.add("u"); jsonArray.add(data); - jsonObject.put("#",jsonArray); + jsonObject.put("#", jsonArray); return jsonObject.toJSONString(); } case "ping": @@ -48,9 +47,9 @@ String messageEncode(String event, Object data, String type){ } } - void messageDecode(final ClusterWS socket, String message){ + void messageDecode(final ClusterWS socket, String message) { JSONArray jsonArray = JSON.parseObject(message).getJSONArray("#"); - switch (jsonArray.getString(0)){ + switch (jsonArray.getString(0)) { case "p": // List channelArrayList = socket.getChannels(); @@ -64,10 +63,10 @@ void messageDecode(final ClusterWS socket, String message){ } break; case "e": - socket.getEmitter().emit(jsonArray.getString(1),jsonArray.get(2)); + socket.getEmitter().emit(jsonArray.getString(1), jsonArray.get(2)); break; case "s": - if (jsonArray.getString(1).equals("c")){ + if (jsonArray.getString(1).equals("c")) { socket.getPingHandler().getPingTimer().scheduleAtFixedRate(new TimerTask() { @Override public void run() { @@ -78,10 +77,10 @@ public void run() { cancel(); } } - },0,jsonArray.getJSONObject(2).getInteger("ping")); + }, 0, jsonArray.getJSONObject(2).getInteger("ping")); boolean useBinary = jsonArray.getJSONObject(2).getBoolean("binary"); socket.setUseBinary(useBinary); - if (socket.getClusterWSListener() != null){ + if (socket.getClusterWSListener() != null) { socket.getClusterWSListener().onConnected(); } } diff --git a/src/main/java/com/clusterws/PingHandler.java b/src/main/java/com/clusterws/PingHandler.java index 5d9b26d..5572ace 100644 --- a/src/main/java/com/clusterws/PingHandler.java +++ b/src/main/java/com/clusterws/PingHandler.java @@ -15,7 +15,7 @@ void incrementMissedPing() { mMissedPing++; } - void setMissedPingToZero(){ + void setMissedPingToZero() { mMissedPing = 0; } diff --git a/src/main/java/com/clusterws/Socket.java b/src/main/java/com/clusterws/Socket.java index 4eaca7e..99d31ac 100644 --- a/src/main/java/com/clusterws/Socket.java +++ b/src/main/java/com/clusterws/Socket.java @@ -27,7 +27,7 @@ public void onMessage(String message) { @Override public void onClose(int code, String reason, boolean remote) { - mSocketEvents.onClose(code,reason); + mSocketEvents.onClose(code, reason); } @Override From 6cb48ec87c6d68124b627aa55ef3df468cfe9828 Mon Sep 17 00:00:00 2001 From: Egor Egorov Date: Tue, 20 Mar 2018 15:59:18 +1300 Subject: [PATCH 4/7] Remade reconnection in ClusterWS --- src/main/java/com/clusterws/ClusterWS.java | 53 ++++++++++++++-------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/clusterws/ClusterWS.java b/src/main/java/com/clusterws/ClusterWS.java index 5d7c714..b77af17 100644 --- a/src/main/java/com/clusterws/ClusterWS.java +++ b/src/main/java/com/clusterws/ClusterWS.java @@ -7,6 +7,9 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ThreadLocalRandom; public class ClusterWS { private Socket mSocket; @@ -17,20 +20,30 @@ public class ClusterWS { private MessageHandler mMessageHandler; private PingHandler mPingHandler; private List mChannels; - private ReconnectionHandler mReconnectionHandler; + private ReconnectionParams mReconnectionParams; public ClusterWS(String url) { - if (url == null){ + if (url == null) { throw new NullPointerException("Url must be provided"); } mUrl = url; mChannels = new ArrayList<>(); - mReconnectionHandler = new ReconnectionHandler(null, null, null, null, this); + mReconnectionParams = new ReconnectionParams( + false, + null, + null, + null); createSocket(); } - public ClusterWS setReconnection(Boolean autoReconnect, Integer reconnectionIntervalMin, Integer reconnectionIntervalMax, Integer reconnectionAttempts) { - mReconnectionHandler = new ReconnectionHandler(autoReconnect, reconnectionIntervalMin, reconnectionIntervalMax, reconnectionAttempts, this); + public ClusterWS setReconnection(Boolean autoReconnect, + Integer reconnectionIntervalMin, + Integer reconnectionIntervalMax, + Integer reconnectionAttempts) { + mReconnectionParams = new ReconnectionParams(autoReconnect, + reconnectionIntervalMin, + reconnectionIntervalMax, + reconnectionAttempts); return this; } @@ -53,7 +66,7 @@ public void on(String event, IEmitterListener listener) { } public void send(String event, Object data) { - send(event,data,"emit"); + send(event, data, "emit"); } public WebSocket.READYSTATE getState() { @@ -117,7 +130,7 @@ private void createSocket() { mSocket = new Socket(URI.create(mUrl), new ISocketEvents() { @Override public void onOpen() { - mReconnectionHandler.onOpen(); + mClusterWSListener.onConnected(); } @Override @@ -132,17 +145,22 @@ public void onClose(int code, String reason) { if (mPingHandler.getPingTimer() != null) { mPingHandler.getPingTimer().cancel(); } - - if (mReconnectionHandler.isInReconnectionState()) { - return; - } - if (mReconnectionHandler.isAutoReconnect() && code != 1000) { - mReconnectionHandler.reconnect(); - } - - if (mClusterWSListener != null) { - mClusterWSListener.onDisconnected(code, reason); + if (mReconnectionParams.isAutoReconnect() && code != 1000 && (mReconnectionParams.getReconnectionAttempts() == 0 || mReconnectionParams.getReconnectionsAttempted() < mReconnectionParams.getReconnectionAttempts())) { + if (mSocket.getReadyState() == WebSocket.READYSTATE.CLOSED || mSocket.getReadyState() == WebSocket.READYSTATE.NOT_YET_CONNECTED) { + mReconnectionParams.incrementReconnectionsAttempted(); + int randomDelay = ThreadLocalRandom.current().nextInt(1, + mReconnectionParams.getReconnectionIntervalMax() - + mReconnectionParams.getReconnectionIntervalMin() + + 1); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + connect(); + } + }, randomDelay); + } } + mClusterWSListener.onDisconnected(code, reason); } @Override @@ -170,5 +188,4 @@ private void onMessageReceived(String message) { mMessageHandler.messageDecode(ClusterWS.this, message); } } - } From 88a81e4ce21cff7c20b9e60c4d8743133848f069 Mon Sep 17 00:00:00 2001 From: Egor Egorov Date: Tue, 20 Mar 2018 15:59:58 +1300 Subject: [PATCH 5/7] Created ReconnectionParams class for holding reconnection values --- .../com/clusterws/ReconnectionParams.java | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 src/main/java/com/clusterws/ReconnectionParams.java diff --git a/src/main/java/com/clusterws/ReconnectionParams.java b/src/main/java/com/clusterws/ReconnectionParams.java new file mode 100644 index 0000000..1246148 --- /dev/null +++ b/src/main/java/com/clusterws/ReconnectionParams.java @@ -0,0 +1,48 @@ +package com.clusterws; + +public class ReconnectionParams { + private Boolean mAutoReconnect; + private Integer mReconnectionIntervalMin; + private Integer mReconnectionIntervalMax; + private Integer mReconnectionAttempts; + private Integer mReconnectionsAttempted; + + public ReconnectionParams(Boolean autoReconnect, + Integer reconnectionIntervalMin, + Integer reconnectionIntervalMax, + Integer reconnectionAttempts) { + mAutoReconnect = autoReconnect != null ? autoReconnect : false; + mReconnectionIntervalMin = reconnectionIntervalMin != null ? reconnectionIntervalMin : 1000; + mReconnectionIntervalMax = reconnectionIntervalMax != null ? reconnectionIntervalMax : 5000; + mReconnectionAttempts = reconnectionAttempts != null ? reconnectionAttempts : 0; + mReconnectionsAttempted = 0; + } + + public boolean isAutoReconnect() { + return mAutoReconnect; + } + + public Integer getReconnectionIntervalMin() { + return mReconnectionIntervalMin; + } + + public Integer getReconnectionIntervalMax() { + return mReconnectionIntervalMax; + } + + public Integer getReconnectionAttempts() { + return mReconnectionAttempts; + } + + public void incrementReconnectionsAttempted() { + mReconnectionsAttempted++; + } + + public void resetReconnectionsAttempted() { + mReconnectionsAttempted = 0; + } + + public Integer getReconnectionsAttempted() { + return mReconnectionsAttempted; + } +} From 2e6225f601327a1a0882990cb6a4e5b87814e1f9 Mon Sep 17 00:00:00 2001 From: Egor Egorov Date: Tue, 20 Mar 2018 16:16:02 +1300 Subject: [PATCH 6/7] Updated badges --- docs/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/README.md b/docs/README.md index c8e5725..111d1a2 100644 --- a/docs/README.md +++ b/docs/README.md @@ -6,7 +6,9 @@

- + GitHub license + + Maintain

From 8973ad3d576a6a44af2b6996c39eb8924b6ad71d Mon Sep 17 00:00:00 2001 From: Egor Egorov Date: Sun, 25 Mar 2018 14:03:57 +1300 Subject: [PATCH 7/7] Updated README Updated ClusterWS logo and Wiki text --- docs/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/README.md b/docs/README.md index 111d1a2..a612746 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,7 +2,7 @@

Build Scalable Node.js WebSocket Applications

- +

@@ -17,5 +17,5 @@

- ClusterWS Java Client Documentation + Find more about ClusterWS JavaScript Client in Wiki Documentation

\ No newline at end of file