diff --git a/README.adoc b/README.adoc index 27b3f15..f0d8f82 100644 --- a/README.adoc +++ b/README.adoc @@ -12,6 +12,10 @@ This cluster orchestration library is not meant to be used for writing productio this library fail without even an attempt at managing nor recovering the problem. This is a reasonable limitation for the scope of this library which is to ease writing multi-machine tests. +=== Required java version + +Starting with version 1.1.0, Java 11 is necessary. Version 1.0.3 is the last one compatible with Java 8. + === Configuring a JVM cluster Creating a `ClusterConfiguration` instance is the first step to creating a cluster of JVMs. diff --git a/pom.xml b/pom.xml index 39d5406..d65be8b 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.mortbay.jetty.orchestrator jetty-cluster-orchestrator - 1.0.5-SNAPSHOT + 1.1.0-SNAPSHOT Jetty :: JVM Cluster Orchestrator https://github.com/jetty-project/jetty-cluster-orchestrator @@ -63,7 +63,7 @@ - [1.8,) + [${maven.compiler.source},) [3.5,) @@ -78,7 +78,14 @@ license-maven-plugin false -
header-template.txt
+ + +
header-template.txt
+ + **/*.java + +
+
true true true @@ -88,9 +95,6 @@ DOUBLESLASH_STYLE - - **/*.java -
@@ -187,7 +191,7 @@ maven-javadoc-plugin 3.5.0 - 8 + ${maven.compiler.source} -html5 @@ -279,13 +283,6 @@ 0.35.0 - - org.zeroturnaround - zt-process-killer - 1.10 - test - true - org.junit.jupiter junit-jupiter @@ -316,16 +313,26 @@ lorban Ludovic Orban - lorban@webtide.com - Webtide + lorban@bitronix.be + Webtide, LLC https://webtide.com + 1 olamy - olamy@webtide.com Olivier Lamy - Webtide + oliver.lamy@gmail.com + Webtide, LLC + https://webtide.com + Australia/Brisbane + + + joakime + Joakim Erdfelt + joakim.erdfelt@gmail.com + Webtide, LLC https://webtide.com + -6 diff --git a/src/main/java/org/mortbay/jetty/orchestrator/ClusterTools.java b/src/main/java/org/mortbay/jetty/orchestrator/ClusterTools.java index a3946cb..079d94b 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/ClusterTools.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/ClusterTools.java @@ -33,6 +33,11 @@ public ClusterTools(CuratorFramework curator, GlobalNodeId globalNodeId) this.globalNodeId = globalNodeId; } + public String getNodeId() + { + return globalNodeId.getNodeId(); + } + public Barrier barrier(String name, int count) { return new Barrier(curator, globalNodeId, name, count); diff --git a/src/main/java/org/mortbay/jetty/orchestrator/NodeArray.java b/src/main/java/org/mortbay/jetty/orchestrator/NodeArray.java index 972106e..8768d92 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/NodeArray.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/NodeArray.java @@ -16,8 +16,8 @@ import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -74,36 +74,64 @@ public NodeArrayFuture executeOn(String id, NodeJob nodeJob) if (node == null) throw new IllegalArgumentException("No such node with ID " + id); - List> futures = new ArrayList<>(); + Map> futures = new HashMap<>(); try { CompletableFuture future = node.rpcClient.callAsync(new ExecuteNodeJobCommand(nodeJob)); - futures.add(future); + futures.put(id, future); } catch (Exception e) { CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(e); - futures.add(future); + futures.put(id, future); + } + return new NodeArrayFuture(futures); + } + + public NodeArrayFuture executeOn(Set ids, NodeJob nodeJob) + { + Set missingIds = new HashSet<>(nodes.keySet()); + ids.forEach(missingIds::remove); + if (!missingIds.isEmpty()) + throw new IllegalArgumentException("No such node with ID " + missingIds); + + Map> futures = new HashMap<>(); + for (String id : ids) + { + Node node = nodes.get(id); + try + { + CompletableFuture future = node.rpcClient.callAsync(new ExecuteNodeJobCommand(nodeJob)); + futures.put(id, future); + } + catch (Exception e) + { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(e); + futures.put(id, future); + } } return new NodeArrayFuture(futures); } public NodeArrayFuture executeOnAll(NodeJob nodeJob) { - List> futures = new ArrayList<>(); - for (Node node : nodes.values()) + Map> futures = new HashMap<>(); + for (Map.Entry entry : nodes.entrySet()) { + String nodeId = entry.getKey(); + Node node = entry.getValue(); try { CompletableFuture future = node.rpcClient.callAsync(new ExecuteNodeJobCommand(nodeJob)); - futures.add(future); + futures.put(nodeId, future); } catch (Exception e) { CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(e); - futures.add(future); + futures.put(nodeId, future); } } return new NodeArrayFuture(futures); diff --git a/src/main/java/org/mortbay/jetty/orchestrator/NodeArrayFuture.java b/src/main/java/org/mortbay/jetty/orchestrator/NodeArrayFuture.java index 4bead17..708ea58 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/NodeArrayFuture.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/NodeArrayFuture.java @@ -15,24 +15,27 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; public class NodeArrayFuture { - private final List> futures; + private final Map> futures; - NodeArrayFuture(List> futures) + NodeArrayFuture(Map> futures) { this.futures = futures; } public void cancel(boolean mayInterruptIfRunning) { - futures.forEach(f -> f.cancel(mayInterruptIfRunning)); + futures.forEach((id, f) -> f.cancel(mayInterruptIfRunning)); } public void get(long timeout, TimeUnit unit) throws ExecutionException, TimeoutException @@ -42,7 +45,7 @@ public void get(long timeout, TimeUnit unit) throws ExecutionException, TimeoutE TimeoutException timeoutException = null; List exceptions = new ArrayList<>(); - for (CompletableFuture future : futures) + for (CompletableFuture future : futures.values()) { long begin = System.nanoTime(); try @@ -102,9 +105,30 @@ public void get() throws ExecutionException } } + public Set getAllNodeIds() + { + return futures.keySet(); + } + + public Set getDoneNodeIds() + { + return futures.entrySet().stream() + .filter(e -> e.getValue().isDone()) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } + + public Set getNotDoneNodeIds() + { + return futures.entrySet().stream() + .filter(e -> !e.getValue().isDone()) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } + public boolean isAllDone() { - return futures.stream() + return futures.values().stream() .map(Future::isDone) .reduce((b1, b2) -> b1 && b2) .orElse(true); @@ -112,7 +136,7 @@ public boolean isAllDone() public boolean isAnyDone() { - return futures.stream() + return futures.values().stream() .map(Future::isDone) .reduce((b1, b2) -> b1 || b2) .orElse(true); diff --git a/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystem.java b/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystem.java index c609ac9..2daae2d 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystem.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystem.java @@ -264,6 +264,7 @@ InputStream newInputStream(NodePath path, OpenOption... options) throws IOExcept } } + @SuppressWarnings("unchecked") A readAttributes(NodePath path, Class type, LinkOption... options) throws IOException { if (!type.equals(BasicFileAttributes.class) && !type.equals(NodeFileAttributes.class)) diff --git a/src/main/java/org/mortbay/jetty/orchestrator/util/ProcessHolder.java b/src/main/java/org/mortbay/jetty/orchestrator/util/ProcessHolder.java index 4d851c4..c5e0656 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/util/ProcessHolder.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/util/ProcessHolder.java @@ -14,7 +14,6 @@ package org.mortbay.jetty.orchestrator.util; import java.io.Serializable; -import java.lang.reflect.Method; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -22,196 +21,55 @@ public class ProcessHolder implements Serializable { - private static final Impl IMPL; - - static - { - Impl impl; - try - { - impl = new JDK9(); - } - catch (Exception e) - { - try - { - impl = new ZtProcessKiller(); - } - catch (Exception ex) - { - throw new IllegalStateException("Neither of the JDK 9+ ProcessHandle API nor the ZT Process Killer API is available", e); - } - } - IMPL = impl; - } - - private final int pid; + private final long pid; public static ProcessHolder from(Process process) { - try - { - return new ProcessHolder(IMPL.asPid(process)); - } - catch (Exception e) - { - return null; - } + return new ProcessHolder(process.toHandle().pid()); } - private ProcessHolder(int pid) + private ProcessHolder(long pid) { this.pid = pid; } - public int getPid() + public long getPid() { return pid; } - public boolean isAlive() throws Exception + public boolean isAlive() { - return IMPL.isAlive(pid); + return ProcessHandle.of(pid) + .map(ProcessHandle::isAlive) + .orElse(false); } public void destroy() throws Exception { - IMPL.destroy(pid); - } - - @Override - public String toString() - { - return "ProcessHolder{" + - "pid=" + pid + - '}'; - } - - private static abstract class Impl - { - abstract int asPid(Process process) throws Exception; - abstract boolean isAlive(int pid) throws Exception; - abstract void destroy(int pid) throws Exception; - } - - private static class JDK9 extends Impl - { - private final Class processHandleClass; - - public JDK9() throws Exception + Optional optional = ProcessHandle.of(pid); + if (optional.isPresent()) { - processHandleClass = Class.forName("java.lang.ProcessHandle"); - } - - @Override - int asPid(Process process) throws Exception - { - Method toHandleMethod = Process.class.getDeclaredMethod("toHandle"); - Object processHandle = toHandleMethod.invoke(process); - - Method pidMethod = processHandleClass.getDeclaredMethod("pid"); - - long pid = (long)pidMethod.invoke(processHandle); - return (int)pid; - } - - @Override - boolean isAlive(int pid) throws Exception - { - Method ofMethod = processHandleClass.getDeclaredMethod("of", long.class); - Method isAliveMethod = processHandleClass.getDeclaredMethod("isAlive"); - - Optional optProcessHandle = (Optional)ofMethod.invoke(processHandleClass, (long) pid); - if (optProcessHandle.isPresent()) + ProcessHandle processHandle = optional.get(); + CompletableFuture onExit = processHandle.onExit(); + processHandle.destroy(); + try { - Object processHandle = optProcessHandle.get(); - return (boolean)isAliveMethod.invoke(processHandle); + onExit.get(10, TimeUnit.SECONDS); } - return false; - } - - @Override - void destroy(int pid) throws Exception - { - Method ofMethod = processHandleClass.getDeclaredMethod("of", long.class); - Method destroyMethod = processHandleClass.getDeclaredMethod("destroy"); - Method destroyForciblyMethod = processHandleClass.getDeclaredMethod("destroyForcibly"); - Method onExitMethod = processHandleClass.getDeclaredMethod("onExit"); - - Optional optProcessHandle = (Optional)ofMethod.invoke(processHandleClass, (long) pid); - if (optProcessHandle.isPresent()) + catch (TimeoutException e) { - Object processHandle = optProcessHandle.get(); - destroyMethod.invoke(processHandle); - - CompletableFuture cf = (CompletableFuture)onExitMethod.invoke(processHandle); - try - { - cf.get(10, TimeUnit.SECONDS); - } - catch (TimeoutException e) - { - destroyForciblyMethod.invoke(processHandle); - try - { - cf.get(10, TimeUnit.SECONDS); - } - catch (TimeoutException ex) - { - throw new RuntimeException(ex); - } - } + processHandle.destroyForcibly(); + onExit.get(10, TimeUnit.SECONDS); } } } - private static class ZtProcessKiller extends Impl + @Override + public String toString() { - private final Class processesClass; - private final Class pidProcessClass; - private final Class systemProcessClass; - private final Class processUtilClass; - - public ZtProcessKiller() throws Exception - { - processesClass = Class.forName("org.zeroturnaround.process.Processes"); - pidProcessClass = Class.forName("org.zeroturnaround.process.PidProcess"); - systemProcessClass = Class.forName("org.zeroturnaround.process.SystemProcess"); - processUtilClass = Class.forName("org.zeroturnaround.process.ProcessUtil"); - } - - @Override - int asPid(Process process) throws Exception - { - Method newPidProcessMethod = processesClass.getDeclaredMethod("newPidProcess", Process.class); - Method getPidMethod = pidProcessClass.getDeclaredMethod("getPid"); - - Object pidProcess = newPidProcessMethod.invoke(processesClass, process); - - return (int)getPidMethod.invoke(pidProcess); - } - - - @Override - boolean isAlive(int pid) throws Exception - { - Method newPidProcessMethod = processesClass.getDeclaredMethod("newPidProcess", int.class); - Method isAliveMethod = systemProcessClass.getDeclaredMethod("isAlive"); - - Object pidProcess = newPidProcessMethod.invoke(processesClass, pid); - - return (boolean)isAliveMethod.invoke(pidProcess); - } - - @Override - void destroy(int pid) throws Exception - { - Method newPidProcessMethod = processesClass.getDeclaredMethod("newPidProcess", int.class); - Method destroyGracefullyOrForcefullyAndWaitMethod = processUtilClass.getDeclaredMethod("destroyGracefullyOrForcefullyAndWait", systemProcessClass, long.class, TimeUnit.class, long.class, TimeUnit.class); - - Object pidProcess = newPidProcessMethod.invoke(processesClass, pid); - - destroyGracefullyOrForcefullyAndWaitMethod.invoke(processUtilClass, pidProcess, 10L, TimeUnit.SECONDS, 10L, TimeUnit.SECONDS); - } + return "ProcessHolder{" + + "pid=" + pid + + '}'; } }