Skip to content

Commit

Permalink
Merge pull request #12 from reactiverse/release/vertx-3.8.0
Browse files Browse the repository at this point in the history
Work for release 1.2.0
  • Loading branch information
romalev authored Aug 28, 2019
2 parents fa377db + 59a0acf commit 52cee4b
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 162 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ branches:
jobs:
include:
- stage: test
name: "Unit tests - OracleJDK 8"
name: "Unit tests - OpenJDK 8"
script: mvn -B -DtestLogLevel=OFF test
jdk: oraclejdk8
jdk: openjdk8
- name: "Unit tests - OpenJDK 11"
if: type != pull_request
script: mvn -B -DtestLogLevel=OFF test
Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

Expand All @@ -28,7 +28,7 @@

<groupId>io.reactiverse</groupId>
<artifactId>consul-cluster-manager</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<name>Vert.x Consul Cluster Manager</name>
<url>https://github.com/reactiverse/consul-cluster-manager</url>
<description>Consul - based cluster manager that can be hooked into Vert.x ecosystem.</description>
Expand Down Expand Up @@ -63,7 +63,7 @@
</developers>

<properties>
<stack.version>3.7.1</stack.version>
<stack.version>3.8.0</stack.version>
<junit.version>4.12</junit.version>
<embedded.consul.version>2.1.2</embedded.consul.version>
</properties>
Expand Down
145 changes: 84 additions & 61 deletions src/main/java/io/vertx/spi/cluster/consul/ConsulClusterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
*/
package io.vertx.spi.cluster.consul;

import io.vertx.core.*;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.impl.TaskQueue;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonObject;
Expand All @@ -29,12 +35,31 @@
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.ext.consul.*;
import io.vertx.spi.cluster.consul.impl.*;
import io.vertx.ext.consul.CheckList;
import io.vertx.ext.consul.CheckOptions;
import io.vertx.ext.consul.CheckStatus;
import io.vertx.ext.consul.ConsulClient;
import io.vertx.ext.consul.ConsulClientOptions;
import io.vertx.ext.consul.KeyValueOptions;
import io.vertx.ext.consul.ServiceOptions;
import io.vertx.spi.cluster.consul.impl.ClusterManagerInternalContext;
import io.vertx.spi.cluster.consul.impl.ConsulAsyncMap;
import io.vertx.spi.cluster.consul.impl.ConsulAsyncMultiMap;
import io.vertx.spi.cluster.consul.impl.ConsulCounter;
import io.vertx.spi.cluster.consul.impl.ConsulLock;
import io.vertx.spi.cluster.consul.impl.ConsulMap;
import io.vertx.spi.cluster.consul.impl.ConsulSyncMap;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -65,19 +90,6 @@ public class ConsulClusterManager extends ConsulMap<String, String> implements C
private static final String NODES_MAP_NAME = "__vertx.nodes";
private static final String SERVICE_NAME = "vert.x-cluster-manager";
private static final long TCP_CHECK_INTERVAL = 10_000;

private final Map<String, Lock> locks = new ConcurrentHashMap<>();
private final Map<String, Counter> counters = new ConcurrentHashMap<>();
private final Map<String, Map<?, ?>> syncMaps = new ConcurrentHashMap<>();
private final Map<String, AsyncMap<?, ?>> asyncMaps = new ConcurrentHashMap<>();
private final Map<String, AsyncMultiMap<?, ?>> asyncMultiMaps = new ConcurrentHashMap<>();
private final JsonObject consulClusterManagerConfig;

/*
* A set that attempts to keep all cluster node's data locally cached. Cluster manager
* watches the consul "__vertx.nodes" path, responds to update/create/delete events, pull down the data.
*/
private final Set<String> nodes = new HashSet<>();
/*
* We have to lock the "node joining the cluster" and "node leaving the cluster" operations
* to stay as much as possible transactionally in sync with consul kv store.
Expand All @@ -94,6 +106,17 @@ public class ConsulClusterManager extends ConsulMap<String, String> implements C
*/
private final static String NODE_JOINING_LOCK_NAME = "nodeJoining";
private final static String NODE_LEAVING_LOCK_NAME = "nodeLeaving";
private final Map<String, Lock> locks = new ConcurrentHashMap<>();
private final Map<String, Counter> counters = new ConcurrentHashMap<>();
private final Map<String, Map<?, ?>> syncMaps = new ConcurrentHashMap<>();
private final Map<String, AsyncMap<?, ?>> asyncMaps = new ConcurrentHashMap<>();
private final Map<String, AsyncMultiMap<?, ?>> asyncMultiMaps = new ConcurrentHashMap<>();
private final JsonObject consulClusterManagerConfig;
/*
* A set that attempts to keep all cluster node's data locally cached. Cluster manager
* watches the consul "__vertx.nodes" path, responds to update/create/delete events, pull down the data.
*/
private final Set<String> nodes = new HashSet<>();
private final AtomicReference<Lock> nodeJoiningLock = new AtomicReference<>();
private final AtomicReference<Lock> nodeLeavingLock = new AtomicReference<>();

Expand Down Expand Up @@ -162,18 +185,18 @@ public void setVertx(Vertx vertx) {

@Override
public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> asyncResultHandler) {
Future<AsyncMultiMap<K, V>> futureAsyncMultiMap = Future.future();
Promise<AsyncMultiMap<K, V>> promiseAsyncMultiMap = Promise.promise();
AsyncMultiMap asyncMultiMap = asyncMultiMaps.computeIfAbsent(name, key -> new ConsulAsyncMultiMap<>(name, preferConsistency, appContext));
futureAsyncMultiMap.complete(asyncMultiMap);
futureAsyncMultiMap.setHandler(asyncResultHandler);
promiseAsyncMultiMap.complete(asyncMultiMap);
promiseAsyncMultiMap.future().setHandler(asyncResultHandler);
}

@Override
public <K, V> void getAsyncMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> asyncResultHandler) {
Future<AsyncMap<K, V>> futureAsyncMap = Future.future();
Promise<AsyncMap<K, V>> promiseAsyncMap = Promise.promise();
AsyncMap asyncMap = asyncMaps.computeIfAbsent(name, key -> new ConsulAsyncMap<>(name, appContext, this));
futureAsyncMap.complete(asyncMap);
futureAsyncMap.setHandler(asyncResultHandler);
promiseAsyncMap.complete(asyncMap);
promiseAsyncMap.future().setHandler(asyncResultHandler);
}

@Override
Expand Down Expand Up @@ -209,10 +232,10 @@ public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lo
@Override
public void getCounter(String name, Handler<AsyncResult<Counter>> resultHandler) {
Objects.requireNonNull(name);
Future<Counter> counterFuture = Future.future();
Promise<Counter> counterFuture = Promise.promise();
Counter counter = counters.computeIfAbsent(name, key -> new ConsulCounter(name, appContext));
counterFuture.complete(counter);
counterFuture.setHandler(resultHandler);
counterFuture.future().setHandler(resultHandler);
}

@Override
Expand Down Expand Up @@ -353,7 +376,7 @@ private Future<Void> registerSessionAndSave() {
* @param details - IP address of node.
*/
private Future<Void> addLocalNode(JsonObject details) {
Future<Lock> lockFuture = Future.future();
Promise<Lock> lockPromise = Promise.promise();
/*
* Time to fight for a "node_joining" lock should exceed @{code TCP_CHECK_INTERVAL} - this is done in purpose for the situation
* where node is the middle of joining the cluster (new entry has been placed under __vertx.nodes map)
Expand All @@ -363,8 +386,8 @@ private Future<Void> addLocalNode(JsonObject details) {
* This gives another node a chance to acquire "node_joining" lock.
*/
long timeout = TCP_CHECK_INTERVAL + 100;
getLockWithTimeout(NODE_JOINING_LOCK_NAME, timeout, lockFuture.completer());
return lockFuture.compose(aLock -> {
getLockWithTimeout(NODE_JOINING_LOCK_NAME, timeout, lockPromise);
return lockPromise.future().compose(aLock -> {
nodeJoiningLock.set(aLock);
return putPlainValue(
keyPath(appContext.getNodeId()),
Expand All @@ -384,10 +407,10 @@ private Future<Void> addLocalNode(JsonObject details) {
* Removes local node (the one that uses this cluster manager) from the cluster (existing entry gets removed from {@code NODES_MAP_NAME}).
*/
private Future<Void> removeLocalNode() {
Future<Lock> lockFuture = Future.future();
Promise<Lock> lockPromise = Promise.promise();
long timeout = TCP_CHECK_INTERVAL + 100;
getLockWithTimeout(NODE_LEAVING_LOCK_NAME, timeout, lockFuture.completer());
return lockFuture.compose(aLock -> {
getLockWithTimeout(NODE_LEAVING_LOCK_NAME, timeout, lockPromise);
return lockPromise.future().compose(aLock -> {
nodeLeavingLock.set(aLock);
return deleteValueByKeyPath(keyPath(appContext.getNodeId()));
}).compose(nodeRemoved -> {
Expand All @@ -412,9 +435,9 @@ private Future<Boolean> isClusterEmpty() {
private Future<Void> clearHaInfoMap() {
return isClusterEmpty().compose(clusterEmpty -> {
if (clusterEmpty) {
Future<Void> clearHaInfoFuture = Future.future();
((ConsulSyncMap) getSyncMap(HA_INFO_MAP_NAME)).clear(handler -> clearHaInfoFuture.complete());
return clearHaInfoFuture;
Promise<Void> clearHaInfoPromise = Promise.promise();
((ConsulSyncMap) getSyncMap(HA_INFO_MAP_NAME)).clear(handler -> clearHaInfoPromise.complete());
return clearHaInfoPromise.future();
} else {
return Future.succeededFuture();
}
Expand Down Expand Up @@ -474,7 +497,7 @@ public void entryUpdated(EntryEvent event) {
* Creates simple tcp server used to receive heart beat messages from consul agent and acknowledge them.
*/
private Future<Void> createTcpServer() {
Future<Void> future = Future.future();
Promise<Void> promise = Promise.promise();
/*
* Figuring out the node's host address (host ip address) is always tricky especially in case
* multiple network interfaces present on the host this cluster manager operates on.
Expand All @@ -488,7 +511,7 @@ private Future<Void> createTcpServer() {
nodeTcpAddress.put("host", InetAddress.getLocalHost().getHostAddress());
} catch (UnknownHostException e) {
log.error(e);
future.fail(e);
promise.fail(e);
}
}

Expand All @@ -498,20 +521,20 @@ private Future<Void> createTcpServer() {
tcpServer.listen(listenEvent -> {
if (listenEvent.succeeded()) {
nodeTcpAddress.put("port", listenEvent.result().actualPort());
future.complete();
} else future.fail(listenEvent.cause());
promise.complete();
} else promise.fail(listenEvent.cause());
});
return future;
return promise.future();
}

/**
* Shut downs CM tcp server.
*/
private Future<Void> shutdownTcpServer() {
Future<Void> future = Future.future();
if (tcpServer != null) tcpServer.close(future.completer());
else future.complete();
return future;
Promise<Void> promise = Promise.promise();
if (tcpServer != null) tcpServer.close(promise);
else promise.complete();
return promise.future();
}

/**
Expand All @@ -520,7 +543,7 @@ private Future<Void> shutdownTcpServer() {
* @return {@link Future} holding the result.
*/
private Future<Void> registerService() {
Future<Void> future = Future.future();
Promise<Void> promise = Promise.promise();
ServiceOptions serviceOptions = new ServiceOptions();
serviceOptions.setName(SERVICE_NAME);
serviceOptions.setTags(Collections.singletonList("vertx-clustering"));
Expand All @@ -529,10 +552,10 @@ private Future<Void> registerService() {
appContext.getConsulClient().registerService(serviceOptions, asyncResult -> {
if (asyncResult.failed()) {
log.error("[" + appContext.getNodeId() + "]" + " - Failed to register vert.x cluster management service.", asyncResult.cause());
future.fail(asyncResult.cause());
} else future.complete();
promise.fail(asyncResult.cause());
} else promise.complete();
});
return future;
return promise.future();
}

/**
Expand All @@ -547,7 +570,7 @@ private Future<Void> registerService() {
* @return {@link Future} holding the result.
*/
private Future<Void> registerTcpCheck() {
Future<Void> future = Future.future();
Promise<Void> promise = Promise.promise();
CheckOptions checkOptions = new CheckOptions()
.setName(checkId)
.setNotes("This check is dedicated to service with id: " + appContext.getNodeId())
Expand All @@ -559,10 +582,10 @@ private Future<Void> registerTcpCheck() {
appContext.getConsulClient().registerCheck(checkOptions, result -> {
if (result.failed()) {
log.error("[" + appContext.getNodeId() + "]" + " - Failed to register check: " + checkOptions.getId(), result.cause());
future.fail(result.cause());
} else future.complete();
promise.fail(result.cause());
} else promise.complete();
});
return future;
return promise.future();
}

/**
Expand All @@ -571,30 +594,30 @@ private Future<Void> registerTcpCheck() {
* @return {@link Future} holding the result.
*/
private Future<Void> deregisterTcpCheck() {
Future<Void> future = Future.future();
Promise<Void> promise = Promise.promise();
appContext.getConsulClient().deregisterCheck(checkId, resultHandler -> {
if (resultHandler.succeeded()) future.complete();
if (resultHandler.succeeded()) promise.complete();
else {
log.error("[" + appContext.getNodeId() + "]" + " - Failed to deregister check: " + checkId, resultHandler.cause());
future.fail(resultHandler.cause());
promise.fail(resultHandler.cause());
}
});
return future;
return promise.future();
}

/**
* Tries to deregister all failing tcp checks from {@code SERVICE_NAME}.
*/
private Future<Void> deregisterFailingTcpChecks() {
Future<CheckList> checkListFuture = Future.future();
appContext.getConsulClient().healthChecks(SERVICE_NAME, checkListFuture.completer());
return checkListFuture.compose(checkList -> {
Promise<CheckList> checkListPromise = Promise.promise();
appContext.getConsulClient().healthChecks(SERVICE_NAME, checkListPromise);
return checkListPromise.future().compose(checkList -> {
List<Future> futures = new ArrayList<>();
checkList.getList().forEach(check -> {
if (check.getStatus() == CheckStatus.CRITICAL) {
Future<Void> future = Future.future();
appContext.getConsulClient().deregisterCheck(check.getId(), future.completer());
futures.add(future);
Promise<Void> promise = Promise.promise();
appContext.getConsulClient().deregisterCheck(check.getId(), promise);
futures.add(promise.future());
}
});
return CompositeFuture.all(futures).compose(compositeFuture -> Future.succeededFuture());
Expand Down
Loading

0 comments on commit 52cee4b

Please sign in to comment.