Skip to content
This repository has been archived by the owner on Nov 2, 2020. It is now read-only.

Commit

Permalink
Merge branch 'next'
Browse files Browse the repository at this point in the history
  • Loading branch information
Yegorisa committed Mar 25, 2018
2 parents 1973a1a + 8973ad3 commit 88f1333
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 138 deletions.
8 changes: 5 additions & 3 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
<h6 align="center">Build Scalable Node.js WebSocket Applications</h6>

<p align="center">
<img src="https://cdn.rawgit.com/goriunov/159120ca6a883d8d4e75543ec395d361/raw/f4c3c36ac1ab75beedcf73312272b60dac33ecfa/clusterws.svg" width="500">
<img src="https://cdn.rawgit.com/goriunov/159120ca6a883d8d4e75543ec395d361/raw/146220360173a2428fceb44e7fc9b2cda8a17832/clusterws.svg" width="450">
</p>

<p align="center">
<a title="JitPack Version" href="https://jitpack.io/#ClusterWS/ClusterWS-Client-Java"><img src="https://jitpack.io/v/ClusterWS/ClusterWS-Client-Java.svg"></a>
<a href="https://github.com/ClusterWS/ClusterWS-Client-Java/blob/master/LICENSE"><img src="https://img.shields.io/github/license/ClusterWS/ClusterWS-Client-JS.svg?style=for-the-badge" alt="GitHub license" /></a>
<a title="JitPack Version" href="https://jitpack.io/#ClusterWS/ClusterWS-Client-Java"><img src="https://img.shields.io/badge/JitPack-1.6.0-brightgreen.svg?longCache=true&style=for-the-badge"></a>
<a href="https://github.com/ClusterWS/ClusterWS-Client-JS/graphs/commit-activity"><img src="https://img.shields.io/badge/Maintain-Yes-green.svg?style=for-the-badge" alt="Maintain" /></a>
</p>

<p align="center">
Expand All @@ -15,5 +17,5 @@

<h1></h1>
<h3 align="center">
<a href="https://github.com/ClusterWS/ClusterWS-Client-Java/wiki"><strong>ClusterWS Java Client Documentation</strong></a>
Find more about ClusterWS JavaScript Client in <a href="https://github.com/ClusterWS/ClusterWS-Client-Java/wiki"><strong>Wiki Documentation</strong></a>
</h3>
12 changes: 6 additions & 6 deletions src/main/java/com/clusterws/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.List;

public class Channel {
public interface IChannelListener{
public interface IChannelListener {
void onDataReceived(String channelName, Object data);
}

Expand All @@ -16,13 +16,13 @@ public Channel(String channelName, ClusterWS clusterWS) {
mClusterWS = clusterWS;
}

public Channel watch(IChannelListener listener){
public Channel watch(IChannelListener listener) {
mChannelListener = listener;
return this;
}

public Channel publish(Object data){
mClusterWS.send(mChannelName,data,"publish");
public Channel publish(Object data) {
mClusterWS.send(mChannelName, data, "publish");
return this;
}

Expand All @@ -42,7 +42,7 @@ void onMessage(Object data) {
}
}

void subscribe(){
mClusterWS.send("subscribe",mChannelName,"system");
void subscribe() {
mClusterWS.send("subscribe", mChannelName, "system");
}
}
53 changes: 35 additions & 18 deletions src/main/java/com/clusterws/ClusterWS.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ThreadLocalRandom;

public class ClusterWS {
private Socket mSocket;
Expand All @@ -17,20 +20,30 @@ public class ClusterWS {
private MessageHandler mMessageHandler;
private PingHandler mPingHandler;
private List<Channel> mChannels;
private ReconnectionHandler mReconnectionHandler;
private ReconnectionParams mReconnectionParams;

public ClusterWS(String url) {
if (url == null){
if (url == null) {
throw new NullPointerException("Url must be provided");
}
mUrl = url;
mChannels = new ArrayList<>();
mReconnectionHandler = new ReconnectionHandler(null, null, null, null, this);
mReconnectionParams = new ReconnectionParams(
false,
null,
null,
null);
createSocket();
}

public ClusterWS setReconnection(Boolean autoReconnect, Integer reconnectionIntervalMin, Integer reconnectionIntervalMax, Integer reconnectionAttempts) {
mReconnectionHandler = new ReconnectionHandler(autoReconnect, reconnectionIntervalMin, reconnectionIntervalMax, reconnectionAttempts, this);
public ClusterWS setReconnection(Boolean autoReconnect,
Integer reconnectionIntervalMin,
Integer reconnectionIntervalMax,
Integer reconnectionAttempts) {
mReconnectionParams = new ReconnectionParams(autoReconnect,
reconnectionIntervalMin,
reconnectionIntervalMax,
reconnectionAttempts);
return this;
}

Expand All @@ -53,7 +66,7 @@ public void on(String event, IEmitterListener listener) {
}

public void send(String event, Object data) {
send(event,data,"emit");
send(event, data, "emit");
}

public WebSocket.READYSTATE getState() {
Expand Down Expand Up @@ -117,7 +130,7 @@ private void createSocket() {
mSocket = new Socket(URI.create(mUrl), new ISocketEvents() {
@Override
public void onOpen() {
mReconnectionHandler.onOpen();
mClusterWSListener.onConnected();
}

@Override
Expand All @@ -132,17 +145,22 @@ public void onClose(int code, String reason) {
if (mPingHandler.getPingTimer() != null) {
mPingHandler.getPingTimer().cancel();
}

if (mReconnectionHandler.isInReconnectionState()) {
return;
}
if (mReconnectionHandler.isAutoReconnect() && code != 1000) {
mReconnectionHandler.reconnect();
}

if (mClusterWSListener != null) {
mClusterWSListener.onDisconnected(code, reason);
if (mReconnectionParams.isAutoReconnect() && code != 1000 && (mReconnectionParams.getReconnectionAttempts() == 0 || mReconnectionParams.getReconnectionsAttempted() < mReconnectionParams.getReconnectionAttempts())) {
if (mSocket.getReadyState() == WebSocket.READYSTATE.CLOSED || mSocket.getReadyState() == WebSocket.READYSTATE.NOT_YET_CONNECTED) {
mReconnectionParams.incrementReconnectionsAttempted();
int randomDelay = ThreadLocalRandom.current().nextInt(1,
mReconnectionParams.getReconnectionIntervalMax() -
mReconnectionParams.getReconnectionIntervalMin() +
1);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
connect();
}
}, randomDelay);
}
}
mClusterWSListener.onDisconnected(code, reason);
}

@Override
Expand Down Expand Up @@ -170,5 +188,4 @@ private void onMessageReceived(String message) {
mMessageHandler.messageDecode(ClusterWS.this, message);
}
}

}
16 changes: 8 additions & 8 deletions src/main/java/com/clusterws/Emitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,28 @@
import java.util.concurrent.ConcurrentHashMap;

class Emitter {
private ConcurrentHashMap<String,IEmitterListener> mEvents;
private ConcurrentHashMap<String, IEmitterListener> mEvents;

Emitter() {
mEvents = new ConcurrentHashMap<>();
}

void addEventListener(String event, IEmitterListener listener){
if (mEvents.containsKey(event)){
mEvents.replace(event,listener);
void addEventListener(String event, IEmitterListener listener) {
if (mEvents.containsKey(event)) {
mEvents.replace(event, listener);
} else {
mEvents.put(event,listener);
mEvents.put(event, listener);
}
}

void emit(String event, Object object){
void emit(String event, Object object) {
IEmitterListener listener = mEvents.get(event);
if (listener != null){
if (listener != null) {
listener.onDataReceived(object);
}
}

void removeAllEvents(){
void removeAllEvents() {
mEvents = new ConcurrentHashMap<>();
}
}
2 changes: 2 additions & 0 deletions src/main/java/com/clusterws/IClusterWSListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public interface IClusterWSListener {
void onConnected();

void onError(Exception exception);

void onDisconnected(int code, String reason);
}
4 changes: 4 additions & 0 deletions src/main/java/com/clusterws/ISocketEvents.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@

public interface ISocketEvents {
void onOpen();

void onError(Exception exception);

void onClose(int code, String reason);

void onBinaryMessage(ByteBuffer bytes);

void onMessage(String message);

}
23 changes: 11 additions & 12 deletions src/main/java/com/clusterws/MessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.util.ArrayList;
import java.util.List;
import java.util.TimerTask;


class MessageHandler {
String messageEncode(String event, Object data, String type){
String messageEncode(String event, Object data, String type) {
JSONObject jsonObject = new JSONObject();
JSONArray jsonArray = new JSONArray();
switch (type){
switch (type) {
case "publish":
jsonArray.add("p");
jsonArray.add(event);
Expand All @@ -27,18 +26,18 @@ String messageEncode(String event, Object data, String type){
jsonObject.put("#", jsonArray);
return jsonObject.toJSONString();
case "system":
switch (event){
switch (event) {
case "subscribe":
jsonArray.add("s");
jsonArray.add("s");
jsonArray.add(data);
jsonObject.put("#",jsonArray);
jsonObject.put("#", jsonArray);
return jsonObject.toJSONString();
case "unsubscribe":
jsonArray.add("s");
jsonArray.add("u");
jsonArray.add(data);
jsonObject.put("#",jsonArray);
jsonObject.put("#", jsonArray);
return jsonObject.toJSONString();
}
case "ping":
Expand All @@ -48,9 +47,9 @@ String messageEncode(String event, Object data, String type){
}
}

void messageDecode(final ClusterWS socket, String message){
void messageDecode(final ClusterWS socket, String message) {
JSONArray jsonArray = JSON.parseObject(message).getJSONArray("#");
switch (jsonArray.getString(0)){
switch (jsonArray.getString(0)) {
case "p":
//
List<Channel> channelArrayList = socket.getChannels();
Expand All @@ -64,10 +63,10 @@ void messageDecode(final ClusterWS socket, String message){
}
break;
case "e":
socket.getEmitter().emit(jsonArray.getString(1),jsonArray.get(2));
socket.getEmitter().emit(jsonArray.getString(1), jsonArray.get(2));
break;
case "s":
if (jsonArray.getString(1).equals("c")){
if (jsonArray.getString(1).equals("c")) {
socket.getPingHandler().getPingTimer().scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Expand All @@ -78,10 +77,10 @@ public void run() {
cancel();
}
}
},0,jsonArray.getJSONObject(2).getInteger("ping"));
}, 0, jsonArray.getJSONObject(2).getInteger("ping"));
boolean useBinary = jsonArray.getJSONObject(2).getBoolean("binary");
socket.setUseBinary(useBinary);
if (socket.getClusterWSListener() != null){
if (socket.getClusterWSListener() != null) {
socket.getClusterWSListener().onConnected();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/clusterws/PingHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ void incrementMissedPing() {
mMissedPing++;
}

void setMissedPingToZero(){
void setMissedPingToZero() {
mMissedPing = 0;
}

Expand Down
Loading

0 comments on commit 88f1333

Please sign in to comment.