diff --git a/.travis.yml b/.travis.yml index 7231706..ca0ea00 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,9 +7,9 @@ branches: jobs: include: - stage: test - name: "Unit tests - OracleJDK 8" + name: "Unit tests - OpenJDK 8" script: mvn -B -DtestLogLevel=OFF test - jdk: oraclejdk8 + jdk: openjdk8 - name: "Unit tests - OpenJDK 11" if: type != pull_request script: mvn -B -DtestLogLevel=OFF test diff --git a/pom.xml b/pom.xml index 3bb9990..86d0a6b 100644 --- a/pom.xml +++ b/pom.xml @@ -15,8 +15,8 @@ ~ limitations under the License. ~ --> - 4.0.0 @@ -28,7 +28,7 @@ io.reactiverse consul-cluster-manager - 1.1.0 + 1.2.0 Vert.x Consul Cluster Manager https://github.com/reactiverse/consul-cluster-manager Consul - based cluster manager that can be hooked into Vert.x ecosystem. @@ -63,7 +63,7 @@ - 3.7.1 + 3.8.0 4.12 2.1.2 diff --git a/src/main/java/io/vertx/spi/cluster/consul/ConsulClusterManager.java b/src/main/java/io/vertx/spi/cluster/consul/ConsulClusterManager.java index 777116b..6f94947 100644 --- a/src/main/java/io/vertx/spi/cluster/consul/ConsulClusterManager.java +++ b/src/main/java/io/vertx/spi/cluster/consul/ConsulClusterManager.java @@ -15,7 +15,13 @@ */ package io.vertx.spi.cluster.consul; -import io.vertx.core.*; +import io.vertx.core.AsyncResult; +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.VertxException; import io.vertx.core.impl.TaskQueue; import io.vertx.core.impl.VertxInternal; import io.vertx.core.json.JsonObject; @@ -29,12 +35,31 @@ import io.vertx.core.spi.cluster.AsyncMultiMap; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.NodeListener; -import io.vertx.ext.consul.*; -import io.vertx.spi.cluster.consul.impl.*; +import io.vertx.ext.consul.CheckList; +import io.vertx.ext.consul.CheckOptions; +import io.vertx.ext.consul.CheckStatus; +import io.vertx.ext.consul.ConsulClient; +import io.vertx.ext.consul.ConsulClientOptions; +import io.vertx.ext.consul.KeyValueOptions; +import io.vertx.ext.consul.ServiceOptions; +import io.vertx.spi.cluster.consul.impl.ClusterManagerInternalContext; +import io.vertx.spi.cluster.consul.impl.ConsulAsyncMap; +import io.vertx.spi.cluster.consul.impl.ConsulAsyncMultiMap; +import io.vertx.spi.cluster.consul.impl.ConsulCounter; +import io.vertx.spi.cluster.consul.impl.ConsulLock; +import io.vertx.spi.cluster.consul.impl.ConsulMap; +import io.vertx.spi.cluster.consul.impl.ConsulSyncMap; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -65,19 +90,6 @@ public class ConsulClusterManager extends ConsulMap implements C private static final String NODES_MAP_NAME = "__vertx.nodes"; private static final String SERVICE_NAME = "vert.x-cluster-manager"; private static final long TCP_CHECK_INTERVAL = 10_000; - - private final Map locks = new ConcurrentHashMap<>(); - private final Map counters = new ConcurrentHashMap<>(); - private final Map> syncMaps = new ConcurrentHashMap<>(); - private final Map> asyncMaps = new ConcurrentHashMap<>(); - private final Map> asyncMultiMaps = new ConcurrentHashMap<>(); - private final JsonObject consulClusterManagerConfig; - - /* - * A set that attempts to keep all cluster node's data locally cached. Cluster manager - * watches the consul "__vertx.nodes" path, responds to update/create/delete events, pull down the data. - */ - private final Set nodes = new HashSet<>(); /* * We have to lock the "node joining the cluster" and "node leaving the cluster" operations * to stay as much as possible transactionally in sync with consul kv store. @@ -94,6 +106,17 @@ public class ConsulClusterManager extends ConsulMap implements C */ private final static String NODE_JOINING_LOCK_NAME = "nodeJoining"; private final static String NODE_LEAVING_LOCK_NAME = "nodeLeaving"; + private final Map locks = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map> syncMaps = new ConcurrentHashMap<>(); + private final Map> asyncMaps = new ConcurrentHashMap<>(); + private final Map> asyncMultiMaps = new ConcurrentHashMap<>(); + private final JsonObject consulClusterManagerConfig; + /* + * A set that attempts to keep all cluster node's data locally cached. Cluster manager + * watches the consul "__vertx.nodes" path, responds to update/create/delete events, pull down the data. + */ + private final Set nodes = new HashSet<>(); private final AtomicReference nodeJoiningLock = new AtomicReference<>(); private final AtomicReference nodeLeavingLock = new AtomicReference<>(); @@ -162,18 +185,18 @@ public void setVertx(Vertx vertx) { @Override public void getAsyncMultiMap(String name, Handler>> asyncResultHandler) { - Future> futureAsyncMultiMap = Future.future(); + Promise> promiseAsyncMultiMap = Promise.promise(); AsyncMultiMap asyncMultiMap = asyncMultiMaps.computeIfAbsent(name, key -> new ConsulAsyncMultiMap<>(name, preferConsistency, appContext)); - futureAsyncMultiMap.complete(asyncMultiMap); - futureAsyncMultiMap.setHandler(asyncResultHandler); + promiseAsyncMultiMap.complete(asyncMultiMap); + promiseAsyncMultiMap.future().setHandler(asyncResultHandler); } @Override public void getAsyncMap(String name, Handler>> asyncResultHandler) { - Future> futureAsyncMap = Future.future(); + Promise> promiseAsyncMap = Promise.promise(); AsyncMap asyncMap = asyncMaps.computeIfAbsent(name, key -> new ConsulAsyncMap<>(name, appContext, this)); - futureAsyncMap.complete(asyncMap); - futureAsyncMap.setHandler(asyncResultHandler); + promiseAsyncMap.complete(asyncMap); + promiseAsyncMap.future().setHandler(asyncResultHandler); } @Override @@ -209,10 +232,10 @@ public void getLockWithTimeout(String name, long timeout, Handler> resultHandler) { Objects.requireNonNull(name); - Future counterFuture = Future.future(); + Promise counterFuture = Promise.promise(); Counter counter = counters.computeIfAbsent(name, key -> new ConsulCounter(name, appContext)); counterFuture.complete(counter); - counterFuture.setHandler(resultHandler); + counterFuture.future().setHandler(resultHandler); } @Override @@ -353,7 +376,7 @@ private Future registerSessionAndSave() { * @param details - IP address of node. */ private Future addLocalNode(JsonObject details) { - Future lockFuture = Future.future(); + Promise lockPromise = Promise.promise(); /* * Time to fight for a "node_joining" lock should exceed @{code TCP_CHECK_INTERVAL} - this is done in purpose for the situation * where node is the middle of joining the cluster (new entry has been placed under __vertx.nodes map) @@ -363,8 +386,8 @@ private Future addLocalNode(JsonObject details) { * This gives another node a chance to acquire "node_joining" lock. */ long timeout = TCP_CHECK_INTERVAL + 100; - getLockWithTimeout(NODE_JOINING_LOCK_NAME, timeout, lockFuture.completer()); - return lockFuture.compose(aLock -> { + getLockWithTimeout(NODE_JOINING_LOCK_NAME, timeout, lockPromise); + return lockPromise.future().compose(aLock -> { nodeJoiningLock.set(aLock); return putPlainValue( keyPath(appContext.getNodeId()), @@ -384,10 +407,10 @@ private Future addLocalNode(JsonObject details) { * Removes local node (the one that uses this cluster manager) from the cluster (existing entry gets removed from {@code NODES_MAP_NAME}). */ private Future removeLocalNode() { - Future lockFuture = Future.future(); + Promise lockPromise = Promise.promise(); long timeout = TCP_CHECK_INTERVAL + 100; - getLockWithTimeout(NODE_LEAVING_LOCK_NAME, timeout, lockFuture.completer()); - return lockFuture.compose(aLock -> { + getLockWithTimeout(NODE_LEAVING_LOCK_NAME, timeout, lockPromise); + return lockPromise.future().compose(aLock -> { nodeLeavingLock.set(aLock); return deleteValueByKeyPath(keyPath(appContext.getNodeId())); }).compose(nodeRemoved -> { @@ -412,9 +435,9 @@ private Future isClusterEmpty() { private Future clearHaInfoMap() { return isClusterEmpty().compose(clusterEmpty -> { if (clusterEmpty) { - Future clearHaInfoFuture = Future.future(); - ((ConsulSyncMap) getSyncMap(HA_INFO_MAP_NAME)).clear(handler -> clearHaInfoFuture.complete()); - return clearHaInfoFuture; + Promise clearHaInfoPromise = Promise.promise(); + ((ConsulSyncMap) getSyncMap(HA_INFO_MAP_NAME)).clear(handler -> clearHaInfoPromise.complete()); + return clearHaInfoPromise.future(); } else { return Future.succeededFuture(); } @@ -474,7 +497,7 @@ public void entryUpdated(EntryEvent event) { * Creates simple tcp server used to receive heart beat messages from consul agent and acknowledge them. */ private Future createTcpServer() { - Future future = Future.future(); + Promise promise = Promise.promise(); /* * Figuring out the node's host address (host ip address) is always tricky especially in case * multiple network interfaces present on the host this cluster manager operates on. @@ -488,7 +511,7 @@ private Future createTcpServer() { nodeTcpAddress.put("host", InetAddress.getLocalHost().getHostAddress()); } catch (UnknownHostException e) { log.error(e); - future.fail(e); + promise.fail(e); } } @@ -498,20 +521,20 @@ private Future createTcpServer() { tcpServer.listen(listenEvent -> { if (listenEvent.succeeded()) { nodeTcpAddress.put("port", listenEvent.result().actualPort()); - future.complete(); - } else future.fail(listenEvent.cause()); + promise.complete(); + } else promise.fail(listenEvent.cause()); }); - return future; + return promise.future(); } /** * Shut downs CM tcp server. */ private Future shutdownTcpServer() { - Future future = Future.future(); - if (tcpServer != null) tcpServer.close(future.completer()); - else future.complete(); - return future; + Promise promise = Promise.promise(); + if (tcpServer != null) tcpServer.close(promise); + else promise.complete(); + return promise.future(); } /** @@ -520,7 +543,7 @@ private Future shutdownTcpServer() { * @return {@link Future} holding the result. */ private Future registerService() { - Future future = Future.future(); + Promise promise = Promise.promise(); ServiceOptions serviceOptions = new ServiceOptions(); serviceOptions.setName(SERVICE_NAME); serviceOptions.setTags(Collections.singletonList("vertx-clustering")); @@ -529,10 +552,10 @@ private Future registerService() { appContext.getConsulClient().registerService(serviceOptions, asyncResult -> { if (asyncResult.failed()) { log.error("[" + appContext.getNodeId() + "]" + " - Failed to register vert.x cluster management service.", asyncResult.cause()); - future.fail(asyncResult.cause()); - } else future.complete(); + promise.fail(asyncResult.cause()); + } else promise.complete(); }); - return future; + return promise.future(); } /** @@ -547,7 +570,7 @@ private Future registerService() { * @return {@link Future} holding the result. */ private Future registerTcpCheck() { - Future future = Future.future(); + Promise promise = Promise.promise(); CheckOptions checkOptions = new CheckOptions() .setName(checkId) .setNotes("This check is dedicated to service with id: " + appContext.getNodeId()) @@ -559,10 +582,10 @@ private Future registerTcpCheck() { appContext.getConsulClient().registerCheck(checkOptions, result -> { if (result.failed()) { log.error("[" + appContext.getNodeId() + "]" + " - Failed to register check: " + checkOptions.getId(), result.cause()); - future.fail(result.cause()); - } else future.complete(); + promise.fail(result.cause()); + } else promise.complete(); }); - return future; + return promise.future(); } /** @@ -571,30 +594,30 @@ private Future registerTcpCheck() { * @return {@link Future} holding the result. */ private Future deregisterTcpCheck() { - Future future = Future.future(); + Promise promise = Promise.promise(); appContext.getConsulClient().deregisterCheck(checkId, resultHandler -> { - if (resultHandler.succeeded()) future.complete(); + if (resultHandler.succeeded()) promise.complete(); else { log.error("[" + appContext.getNodeId() + "]" + " - Failed to deregister check: " + checkId, resultHandler.cause()); - future.fail(resultHandler.cause()); + promise.fail(resultHandler.cause()); } }); - return future; + return promise.future(); } /** * Tries to deregister all failing tcp checks from {@code SERVICE_NAME}. */ private Future deregisterFailingTcpChecks() { - Future checkListFuture = Future.future(); - appContext.getConsulClient().healthChecks(SERVICE_NAME, checkListFuture.completer()); - return checkListFuture.compose(checkList -> { + Promise checkListPromise = Promise.promise(); + appContext.getConsulClient().healthChecks(SERVICE_NAME, checkListPromise); + return checkListPromise.future().compose(checkList -> { List futures = new ArrayList<>(); checkList.getList().forEach(check -> { if (check.getStatus() == CheckStatus.CRITICAL) { - Future future = Future.future(); - appContext.getConsulClient().deregisterCheck(check.getId(), future.completer()); - futures.add(future); + Promise promise = Promise.promise(); + appContext.getConsulClient().deregisterCheck(check.getId(), promise); + futures.add(promise.future()); } }); return CompositeFuture.all(futures).compose(compositeFuture -> Future.succeededFuture()); diff --git a/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMap.java b/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMap.java index 5a9af0d..1481f65 100644 --- a/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMap.java +++ b/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMap.java @@ -15,10 +15,7 @@ */ package io.vertx.spi.cluster.consul.impl; -import io.vertx.core.AsyncResult; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; +import io.vertx.core.*; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.core.shareddata.AsyncMap; @@ -39,7 +36,7 @@ *

* Note: given map is used by vertx nodes to share the data, * entries of this map are always PERSISTENT and NOT EPHEMERAL. - * + *

* For ttl handling see {@link TTLMonitor} * * @author Roman Levytskyi @@ -93,16 +90,16 @@ public void putIfAbsent(K k, V v, long ttl, Handler> completionHa @Override public void remove(K k, Handler> asyncResultHandler) { assertKeyIsNotNull(k).compose(aVoid -> { - Future future = Future.future(); - get(k, future.completer()); - return future; + Promise promise = Promise.promise(); + get(k, promise); + return promise.future(); }).compose(v -> { - Future future = Future.future(); - if (v == null) future.complete(); + Promise promise = Promise.promise(); + if (v == null) promise.complete(); else deleteValueByKeyPath(keyPath(k)) .compose(removeSucceeded -> removeSucceeded ? succeededFuture(v) : failedFuture("Key + " + k + " wasn't removed.")) - .setHandler(future.completer()); - return future; + .setHandler(promise); + return promise.future(); }).setHandler(asyncResultHandler); } @@ -110,9 +107,9 @@ else deleteValueByKeyPath(keyPath(k)) public void removeIfPresent(K k, V v, Handler> resultHandler) { // removes a value from the map, only if entry already exists with same value. assertKeyAndValueAreNotNull(k, v).compose(aVoid -> { - Future future = Future.future(); - get(k, future.completer()); - return future; + Promise promise = Promise.promise(); + get(k, promise); + return promise.future(); }).compose(value -> { if (v.equals(value)) return deleteValueByKeyPath(keyPath(k)) @@ -125,20 +122,20 @@ public void removeIfPresent(K k, V v, Handler> resultHandle public void replace(K k, V v, Handler> asyncResultHandler) { // replaces the entry only if it is currently mapped to some value. assertKeyAndValueAreNotNull(k, v).compose(aVoid -> { - Future future = Future.future(); - get(k, future.completer()); - return future; + Promise promise = Promise.promise(); + get(k, promise); + return promise.future(); }).compose(value -> { - Future future = Future.future(); + Promise promise = Promise.promise(); if (value == null) { - future.complete(); + promise.complete(); } else { put(k, v, event -> { - if (event.succeeded()) future.complete(value); - else future.fail(event.cause()); + if (event.succeeded()) promise.complete(value); + else promise.fail(event.cause()); }); } - return future; + return promise.future(); }).setHandler(asyncResultHandler); } @@ -148,24 +145,24 @@ public void replaceIfPresent(K k, V oldValue, V newValue, Handler assertValueIsNotNull(newValue)) .compose(aVoid -> { - Future future = Future.future(); - get(k, future.completer()); - return future; + Promise promise = Promise.promise(); + get(k, promise); + return promise.future(); }) .compose(value -> { - Future future = Future.future(); + Promise promise = Promise.promise(); if (value != null) { if (value.equals(oldValue)) put(k, newValue, resultPutHandler -> { if (resultPutHandler.succeeded()) - future.complete(true); // old V: '{}' has been replaced by new V: '{}' where K: '{}'", oldValue, newValue, k + promise.complete(true); // old V: '{}' has been replaced by new V: '{}' where K: '{}'", oldValue, newValue, k else - future.fail(resultPutHandler.cause()); // failed replace old V: '{}' by new V: '{}' where K: '{}' due to: '{}'", oldValue, newValue, k, resultPutHandler.cause() + promise.fail(resultPutHandler.cause()); // failed replace old V: '{}' by new V: '{}' where K: '{}' due to: '{}'", oldValue, newValue, k, resultPutHandler.cause() }); else - future.complete(false); // "An entry with K: '{}' doesn't map to old V: '{}' so it won't get replaced.", k, oldValue); - } else future.complete(false); // An entry with K: '{}' doesn't exist, - return future; + promise.complete(false); // "An entry with K: '{}' doesn't map to old V: '{}' so it won't get replaced.", k, oldValue); + } else promise.complete(false); // An entry with K: '{}' doesn't exist, + return promise.future(); }) .setHandler(resultHandler); } @@ -290,7 +287,7 @@ private static class TTLMonitor { * @return */ Future apply(String keyPath, Optional ttl) { - Future future = Future.future(); + Promise promise = Promise.promise(); if (ttl.isPresent()) { if (log.isDebugEnabled()) { log.debug("[" + nodeId + "] : " + "applying ttl monitoring on: " + keyPath + " with ttl: " + ttl.get()); @@ -298,15 +295,15 @@ Future apply(String keyPath, Optional ttl) { String lockName = "ttlLockOn/" + keyPath; clusterManager.getLockWithTimeout(lockName, 50, lockObtainedEvent -> { setTimer(keyPath, ttl.get(), lockName, lockObtainedEvent); - future.complete(); + promise.complete(); }); } else { // there's no need to monitor an entry -> no ttl is there. // try to remove keyPath from timerMap cancelTimer(keyPath); - future.complete(); + promise.complete(); } - return future; + return promise.future(); } /** diff --git a/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMultiMap.java b/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMultiMap.java index 012736e..e7ab4d1 100644 --- a/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMultiMap.java +++ b/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMultiMap.java @@ -19,6 +19,7 @@ import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.impl.TaskQueue; import io.vertx.core.impl.VertxInternal; import io.vertx.core.json.Json; @@ -41,7 +42,9 @@ import static io.vertx.core.Future.failedFuture; import static io.vertx.core.Future.succeededFuture; -import static io.vertx.spi.cluster.consul.impl.ConversationUtils.*; +import static io.vertx.spi.cluster.consul.impl.ConversationUtils.asConsulEntry; +import static io.vertx.spi.cluster.consul.impl.ConversationUtils.asFutureConsulEntry; +import static io.vertx.spi.cluster.consul.impl.ConversationUtils.asFutureString; /** * Distributed async multimap implementation backed by consul kv store. IMPORTANT: the purpose of async multimap in vertx cluster management is to hold mapping between @@ -91,6 +94,10 @@ public ConsulAsyncMultiMap(String name, boolean preferConsistency, ClusterManage } } + private static String getRidOfNodeId(String consulKeyPath) { + return consulKeyPath.substring(0, consulKeyPath.lastIndexOf("/")); + } + @Override public void add(K k, V v, Handler> completionHandler) { assertKeyAndValueAreNotNull(k, v) @@ -186,7 +193,7 @@ private Future addToConsulKv(K key, Set vs, String nodeId) { * TODO: Is there any way in vert.x ecosystem to execute tasks on the event loop by not giving up an order ? */ private Future> doGet(K key) { - Future> out = Future.future(); + Promise> out = Promise.promise(); VertxInternal vertxInternal = (VertxInternal) appContext.getVertx(); vertxInternal.getOrCreateContext().>executeBlocking(event -> { Future> future = preferConsistency @@ -194,7 +201,7 @@ private Future> doGet(K key) { ChoosableSet choosableSet = completeAndGet(future, 5000); event.complete(choosableSet); }, taskQueue, res -> out.complete(res.result())); - return out; + return out.future(); } private Future> cacheableGet(K key) { @@ -280,10 +287,10 @@ private Future> getAllByKey(String consulKeyPath) { * Returns an set of an internal {@link ConsulEntry} all entries filtered by specified consul key path. */ private Future>>> getAll(String consulKey) { - Future future = Future.future(); - appContext.getConsulClient().getValues(consulKey, future.completer()); + Promise promise = Promise.promise(); + appContext.getConsulClient().getValues(consulKey, promise); - return future.compose(keyValueList -> { + return promise.future().compose(keyValueList -> { List keyValues = nullSafeListResult(keyValueList); List futures = new ArrayList<>(); keyValues @@ -304,10 +311,6 @@ private Future>>> getAll(String consulKey) { }); } - private static String getRidOfNodeId(String consulKeyPath) { - return consulKeyPath.substring(0, consulKeyPath.lastIndexOf("/")); - } - private ChoosableSet toChoosableSet(Set set) { ChoosableSet choosableSet = new ChoosableSet<>(set.size()); set.forEach(choosableSet::add); diff --git a/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulCounter.java b/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulCounter.java index c963456..5a62731 100644 --- a/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulCounter.java +++ b/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulCounter.java @@ -16,8 +16,8 @@ package io.vertx.spi.cluster.consul.impl; import io.vertx.core.AsyncResult; -import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.shareddata.Counter; import io.vertx.ext.consul.KeyValue; import io.vertx.ext.consul.KeyValueOptions; @@ -95,14 +95,14 @@ public void compareAndSet(long expected, long value, Handler { - Future result = Future.future(); + Promise promise = Promise.promise(); final Long preValue = extractActualCounterValue(keyValue); if (preValue == expected) { - putPlainValue(consulKey, String.valueOf(value), null).setHandler(result.completer()); + putPlainValue(consulKey, String.valueOf(value), null).setHandler(promise); } else { - result.complete(false); + promise.complete(false); } - return result; + return promise.future(); }) .setHandler(resultHandler); } @@ -114,7 +114,7 @@ private void calculateAndCompareAndSwap(boolean postGet, Long value, Handler { - Future result = Future.future(); + Promise result = Promise.promise(); final Long preValue = extractActualCounterValue(keyValue); final Long postValue = preValue + value; putPlainValue(consulKey, String.valueOf(postValue), new KeyValueOptions().setCasIndex(keyValue.getModifyIndex())) @@ -124,13 +124,13 @@ private void calculateAndCompareAndSwap(boolean postGet, Long value, Handler putValue(K k, V v, KeyValueOptions keyValueOptions) { * @return {@link Future}} containing result. */ protected Future putPlainValue(String key, String value, KeyValueOptions keyValueOptions) { - Future future = Future.future(); + Promise promise = Promise.promise(); appContext.getConsulClient().putValueWithOptions(key, value, keyValueOptions, resultHandler -> { if (resultHandler.succeeded()) { if (log.isTraceEnabled()) { @@ -92,13 +103,13 @@ protected Future putPlainValue(String key, String value, KeyValueOption log.trace(traceMessage); } } - future.complete(resultHandler.result()); + promise.complete(resultHandler.result()); } else { log.error("[" + appContext.getNodeId() + "]" + " - Failed to put " + key + " -> " + value, resultHandler.cause()); - future.fail(resultHandler.cause()); + promise.fail(resultHandler.cause()); } }); - return future; + return promise.future(); } /** @@ -121,20 +132,20 @@ Future getValue(K k) { * @return @return {@link Future}} containing result. */ Future getPlainValue(String consulKey) { - Future future = Future.future(); + Promise promise = Promise.promise(); appContext.getConsulClient().getValue(consulKey, resultHandler -> { if (resultHandler.succeeded()) { // note: resultHandler.result().getValue() is null if nothing was found. if (log.isTraceEnabled()) { log.trace("[" + appContext.getNodeId() + "]" + " - Entry is found : " + resultHandler.result().getValue() + " by key: " + consulKey); } - future.complete(resultHandler.result()); + promise.complete(resultHandler.result()); } else { log.error("[" + appContext.getNodeId() + "]" + " - Failed to look up an entry by: " + consulKey, resultHandler.cause()); - future.fail(resultHandler.cause()); + promise.fail(resultHandler.cause()); } }); - return future; + return promise.future(); } /** @@ -175,45 +186,45 @@ Future deleteValue(K key) { * @return @return {@link Future}} containing result. */ protected Future deleteValueByKeyPath(String keyPath) { - Future result = Future.future(); + Promise promise = Promise.promise(); appContext.getConsulClient().deleteValue(keyPath, resultHandler -> { if (resultHandler.succeeded()) { if (log.isTraceEnabled()) { log.trace("[" + appContext.getNodeId() + "] " + keyPath + " -> " + " remove is true."); } - result.complete(true); + promise.complete(true); } else { - log.error("[" + appContext.getNodeId() + "]" + " - Failed to remove an entry by keyPath: " + keyPath, result.cause()); - result.fail(resultHandler.cause()); + log.error("[" + appContext.getNodeId() + "]" + " - Failed to remove an entry by keyPath: " + keyPath, resultHandler.cause()); + promise.fail(resultHandler.cause()); } }); - return result; + return promise.future(); } /** * Deletes the entire map. */ Future deleteAll() { - Future future = Future.future(); + Promise promise = Promise.promise(); appContext.getConsulClient().deleteValues(name, result -> { if (result.succeeded()) { if (log.isTraceEnabled()) { log.trace("[" + appContext.getNodeId() + "] - has removed all of: " + name); } - future.complete(); + promise.complete(); } else { log.error("[" + appContext.getNodeId() + "]" + " - Failed to clear an entire: " + name); - future.fail(result.cause()); + promise.fail(result.cause()); } }); - return future; + return promise.future(); } /** * @return {@link Future} of plain consul kv map's keys. */ protected Future> plainKeys() { - Future> futureKeys = Future.future(); + Promise> futureKeys = Promise.promise(); appContext.getConsulClient().getKeys(name, resultHandler -> { if (resultHandler.succeeded()) { if (log.isTraceEnabled()) { @@ -225,22 +236,22 @@ protected Future> plainKeys() { futureKeys.fail(resultHandler.cause()); } }); - return futureKeys; + return futureKeys.future(); } /** * @return {@link Future} of plain consul kv map's entries. */ Future> plainEntries() { - Future> keyValueListFuture = Future.future(); + Promise> keyValueListPromise = Promise.promise(); appContext.getConsulClient().getValues(name, resultHandler -> { - if (resultHandler.succeeded()) keyValueListFuture.complete(nullSafeListResult(resultHandler.result())); + if (resultHandler.succeeded()) keyValueListPromise.complete(nullSafeListResult(resultHandler.result())); else { log.error("[" + appContext.getNodeId() + "]" + " - Failed to fetch entries of: " + name, resultHandler.cause()); - keyValueListFuture.fail(resultHandler.cause()); + keyValueListPromise.fail(resultHandler.cause()); } }); - return keyValueListFuture; + return keyValueListPromise.future(); } /** @@ -251,7 +262,7 @@ Future> plainEntries() { * @return {@link Future} session id. */ protected Future registerSession(String sessionName, String checkId) { - Future future = Future.future(); + Promise promise = Promise.promise(); SessionOptions sessionOptions = new SessionOptions() .setBehavior(SessionBehavior.DELETE) .setLockDelay(0) @@ -263,32 +274,32 @@ protected Future registerSession(String sessionName, String checkId) { if (log.isTraceEnabled()) { log.trace("[" + appContext.getNodeId() + "]" + " - " + sessionName + ": " + session.result() + " has been registered."); } - future.complete(session.result()); + promise.complete(session.result()); } else { log.error("[" + appContext.getNodeId() + "]" + " - Failed to register the session.", session.cause()); - future.fail(session.cause()); + promise.fail(session.cause()); } }); - return future; + return promise.future(); } /** * Destroys node's session in consul. */ protected Future destroySession(String sessionId) { - Future future = Future.future(); + Promise promise = Promise.promise(); appContext.getConsulClient().destroySession(sessionId, resultHandler -> { if (resultHandler.succeeded()) { if (log.isTraceEnabled()) { log.trace("[" + appContext.getNodeId() + "]" + " - Session: " + sessionId + " has been successfully destroyed."); } - future.complete(); + promise.complete(); } else { log.error("[" + appContext.getNodeId() + "]" + " - Failed to destroy session: " + sessionId, resultHandler.cause()); - future.fail(resultHandler.cause()); + promise.fail(resultHandler.cause()); } }); - return future; + return promise.future(); } /** diff --git a/src/main/java/io/vertx/spi/cluster/consul/impl/ConversationUtils.java b/src/main/java/io/vertx/spi/cluster/consul/impl/ConversationUtils.java index 655764b..d08498f 100644 --- a/src/main/java/io/vertx/spi/cluster/consul/impl/ConversationUtils.java +++ b/src/main/java/io/vertx/spi/cluster/consul/impl/ConversationUtils.java @@ -16,11 +16,20 @@ package io.vertx.spi.cluster.consul.impl; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.JsonObject; import io.vertx.core.shareddata.impl.ClusterSerializable; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; import java.lang.reflect.Constructor; import java.util.Base64; import java.util.Optional; @@ -103,30 +112,30 @@ static Optional asTtlConsulEntry(String consulEntry) { * {@link Future} wrapper around asString. */ static Future asFutureString(K key, V value, String nodeId) { - Future result = Future.future(); + Promise result = Promise.promise(); try { result.complete(asString(key, value, nodeId, Optional.empty())); } catch (IOException e) { result.fail(e); } - return result; + return result.future(); } /** * {@link Future} wrapper around asString that takes into account TTL. */ static Future asFutureString(K key, V value, String nodeId, Long ttl) { - Future result = Future.future(); + Promise result = Promise.promise(); try { result.complete(asString(key, value, nodeId, Optional.ofNullable(ttl))); } catch (IOException e) { result.fail(e); } - return result; + return result.future(); } static Future> asFutureConsulEntry(String object) { - Future> result = Future.future(); + Promise> result = Promise.promise(); if (object == null) result.complete(); else { try { @@ -135,7 +144,7 @@ static Future> asFutureConsulEntry(String object) { result.fail(e); } } - return result; + return result.future(); } /**