Skip to content

Commit

Permalink
Renamed packages client -> api and commons -> common.
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiktor Gromniak committed Aug 11, 2016
1 parent 3fe9ec1 commit d89adba
Show file tree
Hide file tree
Showing 63 changed files with 488 additions and 114 deletions.
21 changes: 21 additions & 0 deletions src/main/java/net/quedex/api/common/CommunicationException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package net.quedex.api.common;

import java.io.IOException;

public class CommunicationException extends IOException {

public CommunicationException() {
}

public CommunicationException(String message) {
super(message);
}

public CommunicationException(String message, Throwable cause) {
super(message, cause);
}

public CommunicationException(Throwable cause) {
super(cause);
}
}
148 changes: 148 additions & 0 deletions src/main/java/net/quedex/api/common/Config.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package net.quedex.api.common;

import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.io.Resources;
import net.quedex.api.pgp.BcPrivateKey;
import net.quedex.api.pgp.BcPublicKey;
import net.quedex.api.pgp.PGPKeyInitialisationException;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

public class Config {

private final String marketStreamUrl;
private final String marketDataUrl;
private final String userStreamUrl;
private final BcPublicKey qdxPublicKey;
private final BcPrivateKey userPrivateKey;
private final long accountId;
private final int nonceGroup;

public Config(
String marketStreamUrl,
String marketDataUrl,
String userStreamUrl,
BcPublicKey qdxPublicKey,
BcPrivateKey userPrivateKey,
long accountId,
int nonceGroup
) {
checkArgument(!marketStreamUrl.isEmpty(), "Empty marketStreamUrl");
checkArgument(!marketDataUrl.isEmpty(), "Empty marketDataUrl");
checkArgument(!userStreamUrl.isEmpty(), "Empty userStreamUrl");
checkArgument(accountId > 0, "accountId=%s <= 0", accountId);
checkArgument(nonceGroup >= 0, "accountId=%s < 0", nonceGroup);
this.marketStreamUrl = marketStreamUrl + "?keepalive=true";
this.marketDataUrl = marketDataUrl;
this.userStreamUrl = userStreamUrl + "?keepalive=true";
this.qdxPublicKey = checkNotNull(qdxPublicKey, "null qdxPublicKey");
this.userPrivateKey = checkNotNull(userPrivateKey, "null userPrivateKey");
this.accountId = accountId;
this.nonceGroup = nonceGroup;
}

public static Config fromResource(String resourceName, char[] prvKeyPasspharse) {
try {
return fromInputStream(Resources.getResource(resourceName).openStream(), prvKeyPasspharse);
} catch (IOException e) {
throw new IllegalArgumentException("Error reading resource=" + resourceName, e);
}
}

public static Config fromResource(char[] prvKeyPasspharse) {
return fromResource("qdxConfig.properties", prvKeyPasspharse);
}

public static Config fromInputStream(InputStream inputStream, char[] prvKeyPasspharse) {
Properties props = new Properties();
try {
props.load(inputStream);
} catch (IOException e) {
throw new IllegalStateException("Error reading properties file", e);
}

try {
return new Config(
props.getProperty("net.quedex.client.api.marketStreamUrl"),
props.getProperty("net.quedex.client.api.marketDataUrl"),
props.getProperty("net.quedex.client.api.userStreamUrl"),
BcPublicKey.fromArmored(props.getProperty("net.quedex.client.api.qdxPublicKey")),
BcPrivateKey.fromArmored(props.getProperty("net.quedex.client.api.userPrivateKey"), prvKeyPasspharse),
Long.parseLong(props.getProperty("net.quedex.client.api.accountId")),
Integer.parseInt(props.getProperty("net.quedex.client.api.nonceGroup"))
);
} catch (PGPKeyInitialisationException e) {
throw new IllegalArgumentException("Error instantiating keys", e);
}
}

public String getMarketStreamUrl() {
return marketStreamUrl;
}

public String getInstrumentDataUrl() {
return marketDataUrl + "/instrument_data";
}

public String getMarketDataUrl() {
return marketDataUrl;
}

public String getUserStreamUrl() {
return userStreamUrl;
}

public BcPublicKey getQdxPublicKey() {
return qdxPublicKey;
}

public BcPrivateKey getUserPrivateKey() {
return userPrivateKey;
}

public long getAccountId() {
return accountId;
}

public int getNonceGroup() {
return nonceGroup;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Config config = (Config) o;
return accountId == config.accountId &&
nonceGroup == config.nonceGroup &&
Objects.equal(marketStreamUrl, config.marketStreamUrl) &&
Objects.equal(marketDataUrl, config.marketDataUrl) &&
Objects.equal(userStreamUrl, config.userStreamUrl) &&
Objects.equal(qdxPublicKey, config.qdxPublicKey) &&
Objects.equal(userPrivateKey, config.userPrivateKey);
}

@Override
public int hashCode() {
return Objects.hashCode(marketStreamUrl, marketDataUrl, userStreamUrl, qdxPublicKey, userPrivateKey, accountId, nonceGroup);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("marketStreamUrl", marketStreamUrl)
.add("marketDataUrl", marketDataUrl)
.add("userStreamUrl", userStreamUrl)
.add("qdxPublicKey", qdxPublicKey)
.add("userPrivateKey", userPrivateKey)
.add("accountId", accountId)
.add("nonceGroup", nonceGroup)
.toString();
}
}
7 changes: 7 additions & 0 deletions src/main/java/net/quedex/api/common/MaintenanceException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package net.quedex.api.common;

public class MaintenanceException extends CommunicationException {

public MaintenanceException() {
}
}
69 changes: 69 additions & 0 deletions src/main/java/net/quedex/api/common/MessageReceiver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package net.quedex.api.common;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.quedex.api.market.StreamFailureListener;
import net.quedex.api.pgp.PGPExceptionBase;
import org.slf4j.Logger;

import java.io.IOException;

import static com.google.common.base.Preconditions.checkNotNull;

public abstract class MessageReceiver {

public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

private final Logger logger;

private volatile StreamFailureListener streamFailureListener;

protected MessageReceiver(Logger logger) {
this.logger = checkNotNull(logger, "null logger");
}

protected abstract void processData(String data) throws IOException, PGPExceptionBase;

public final void processMessage(String message) {
try {
JsonNode metaJson = OBJECT_MAPPER.readTree(message);

switch (metaJson.get("type").asText()) {
case "data":
processData(metaJson.get("data").asText());
break;
case "error":
processError(metaJson.get("error_code").asText());
break;
default:
// no-op
break;
}
} catch (IOException e) {
onError(new CommunicationException("Error parsing json entity on message=" + message, e));
} catch (PGPExceptionBase e) {
onError(new CommunicationException("PGP error on message=" + message, e));
}
}

private void processError(String errorCode) {
logger.trace("processError({})", errorCode);
if ("maintenance".equals(errorCode)) {
onError(new MaintenanceException());
}
}

private void onError(Exception e) {
logger.warn("onError({})", e);
StreamFailureListener streamFailureListener = this.streamFailureListener;
if (streamFailureListener != null) {
streamFailureListener.onStreamFailure(e);
}
}

public final void registerStreamFailureListener(StreamFailureListener streamFailureListener) {
this.streamFailureListener = streamFailureListener;
}
}
9 changes: 9 additions & 0 deletions src/main/java/net/quedex/api/common/SessionStateListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package net.quedex.api.common;

import net.quedex.api.market.SessionState;

@FunctionalInterface
public interface SessionStateListener {

void onSessionState(SessionState newSessionState);
}
120 changes: 120 additions & 0 deletions src/main/java/net/quedex/api/common/WebsocketStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package net.quedex.api.common;

import net.quedex.api.market.StreamFailureListener;
import org.java_websocket.client.DefaultSSLWebSocketClientFactory;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_17;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;

import javax.net.ssl.SSLContext;
import java.net.URI;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.google.common.base.Preconditions.checkNotNull;

public class WebsocketStream<T extends MessageReceiver> {

private final Logger logger;

protected final WebSocketClient webSocketClient;
private final ExecutorService webSocketClientFactoryExec;
protected final T messageReceiver;

private volatile StreamFailureListener streamFailureListener;

protected WebsocketStream(
Logger logger,
String streamUrl,
T messageReceiver
) {
// TODO: Java-Websockets holds infinitely growing queues
webSocketClient = new WebSocketClient(URI.create(streamUrl), new Draft_17()) {
@Override
public void onOpen(ServerHandshake handshakedata) {
logger.info(
"Websocket opened with url={}, httpStatus={}, httpStatusMessage={}",
streamUrl, handshakedata.getHttpStatus(), handshakedata.getHttpStatusMessage()
);
}

@Override
public void onMessage(String message) {
WebsocketStream.this.processMessage(message);
}

@Override
public void onClose(int code, String reason, boolean remote) {
if (remote) {
WebsocketStream.this.onError(
new CommunicationException("Websocket closed with code=" + code + ", reason=" + reason)
);
} else {
logger.info("Websocket closed with code={}, reason={}", code, reason);
}
}

@Override
public void onError(Exception ex) {
WebsocketStream.this.onError(new CommunicationException("Websocket error", ex));
}
};

webSocketClientFactoryExec = Executors.newSingleThreadExecutor();
try {
SSLContext ssl = SSLContext.getInstance("TLS");
ssl.init(null, null, null);
DefaultSSLWebSocketClientFactory webSocketClientFactory =
new DefaultSSLWebSocketClientFactory(ssl, webSocketClientFactoryExec);
webSocketClient.setWebSocketFactory(webSocketClientFactory);
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new IllegalStateException("Error initialising SSL", e);
}

this.messageReceiver = checkNotNull(messageReceiver, "null messageReceiver");
this.logger = checkNotNull(logger, "null logger");
}

public void registerStreamFailureListener(StreamFailureListener streamFailureListener) {
this.streamFailureListener = streamFailureListener;
messageReceiver.registerStreamFailureListener(streamFailureListener);
}

public void start() throws CommunicationException {
logger.trace("Starting");
try {
webSocketClient.connectBlocking();
} catch (InterruptedException e) {
Thread.interrupted();
}
logger.info("Started");
}

public void stop() throws CommunicationException {
logger.trace("Stopping");
// has to be closed this way because of incompatibilities in WS protocol
webSocketClient.close();
webSocketClient.getConnection().closeConnection(1000, "");
try {
webSocketClient.closeBlocking();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
webSocketClientFactoryExec.shutdown();
logger.info("Stopped");
}

private void processMessage(String message) {
messageReceiver.processMessage(message);
}

private void onError(Exception e) {
StreamFailureListener streamFailureListener = this.streamFailureListener;
if (streamFailureListener != null) {
streamFailureListener.onStreamFailure(e);
}
}
}
Loading

0 comments on commit d89adba

Please sign in to comment.