diff --git a/vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java index b62716d5e14..6636fd01fd4 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -14,6 +14,7 @@ import io.netty.channel.EventLoop; import io.vertx.core.*; import io.vertx.core.Future; +import io.vertx.core.impl.deployment.Deployment; import io.vertx.core.internal.EventExecutor; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; diff --git a/vertx-core/src/main/java/io/vertx/core/impl/DeploymentManager.java b/vertx-core/src/main/java/io/vertx/core/impl/DeploymentManager.java deleted file mode 100644 index 08703d70902..00000000000 --- a/vertx-core/src/main/java/io/vertx/core/impl/DeploymentManager.java +++ /dev/null @@ -1,470 +0,0 @@ -/* - * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 - * which is available at https://www.apache.org/licenses/LICENSE-2.0. - * - * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 - */ - -package io.vertx.core.impl; - -import io.netty.channel.EventLoop; -import io.vertx.core.*; -import io.vertx.core.internal.CloseFuture; -import io.vertx.core.internal.ContextInternal; -import io.vertx.core.json.JsonObject; -import io.vertx.core.internal.logging.Logger; -import io.vertx.core.internal.logging.LoggerFactory; - -import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; - -/** - * @author Tim Fox - */ -public class DeploymentManager { - - private static final Logger log = LoggerFactory.getLogger(DeploymentManager.class); - - private final VertxImpl vertx; - private final Map deploying = new HashMap<>(); - private final Map deployments = new ConcurrentHashMap<>(); - - public DeploymentManager(VertxImpl vertx) { - this.vertx = vertx; - } - - private String generateDeploymentID() { - return UUID.randomUUID().toString(); - } - - public Future deployVerticle(Callable verticleSupplier, DeploymentOptions options) { - if (options.getInstances() < 1) { - throw new IllegalArgumentException("Can't specify < 1 instances to deploy"); - } - options.checkIsolationNotDefined(); - ContextInternal currentContext = vertx.getOrCreateContext(); - ClassLoader cl = options.getClassLoader(); - if (cl == null) { - cl = Thread.currentThread().getContextClassLoader(); - if (cl == null) { - cl = getClass().getClassLoader(); - } - } - return doDeploy(options, v -> "java:" + v.getClass().getName(), currentContext, currentContext, cl, verticleSupplier) - .map(Deployment::deploymentID); - } - - public Future undeployVerticle(String deploymentID) { - Deployment deployment = deployments.get(deploymentID); - ContextInternal currentContext = vertx.getOrCreateContext(); - if (deployment == null) { - return currentContext.failedFuture(new IllegalStateException("Unknown deployment")); - } else { - return deployment.doUndeploy(vertx.getOrCreateContext()); - } - } - - public Set deployments() { - return Collections.unmodifiableSet(deployments.keySet()); - } - - public Deployment getDeployment(String deploymentID) { - return deployments.get(deploymentID); - } - - public Future undeployAll() { - // TODO timeout if it takes too long - e.g. async stop verticle fails to call future - - // We only deploy the top level verticles as the children will be undeployed when the parent is - while (true) { - DeploymentImpl deployment; - synchronized (deploying) { - if (deploying.isEmpty()) { - break; - } - Iterator> it = deploying.entrySet().iterator(); - Map.Entry entry = it.next(); - it.remove(); - deployment = (DeploymentImpl) entry.getValue(); - } - deployment.verticles.forEach(holder -> { - Promise startPromise = holder.startPromise; - if (startPromise != null) { - startPromise.tryFail(new VertxException("Verticle undeployed", true)); - } - }); - } - Set deploymentIDs = new HashSet<>(); - for (Map.Entry entry: deployments.entrySet()) { - if (!entry.getValue().isChild()) { - deploymentIDs.add(entry.getKey()); - } - } - List> completionList = new ArrayList<>(); - if (!deploymentIDs.isEmpty()) { - for (String deploymentID : deploymentIDs) { - Promise promise = Promise.promise(); - completionList.add(promise.future()); - undeployVerticle(deploymentID).onComplete(ar -> { - if (ar.failed()) { - // Log but carry on regardless - log.error("Undeploy failed", ar.cause()); - } - promise.handle(ar); - }); - } - Promise promise = vertx.getOrCreateContext().promise(); - Future.join(completionList).mapEmpty().onComplete(promise); - return promise.future(); - } else { - return vertx.getOrCreateContext().succeededFuture(); - } - } - - Future doDeploy(DeploymentOptions options, - Function identifierProvider, - ContextInternal parentContext, - ContextInternal callingContext, - ClassLoader tccl, Callable verticleSupplier) { - int nbInstances = options.getInstances(); - Set verticles = Collections.newSetFromMap(new IdentityHashMap<>()); - for (int i = 0; i < nbInstances; i++) { - Verticle verticle; - try { - verticle = verticleSupplier.call(); - } catch (Exception e) { - return Future.failedFuture(e); - } - if (verticle == null) { - return Future.failedFuture("Supplied verticle is null"); - } - verticles.add(verticle); - } - if (verticles.size() != nbInstances) { - return Future.failedFuture("Same verticle supplied more than once"); - } - Verticle[] verticlesArray = verticles.toArray(new Verticle[0]); - return doDeploy(identifierProvider.apply(verticlesArray[0]), options, parentContext, callingContext, tccl, verticlesArray); - } - - private Future doDeploy(String identifier, - DeploymentOptions options, - ContextInternal parentContext, - ContextInternal callingContext, - ClassLoader tccl, - Verticle... verticles) { - Promise promise = callingContext.promise(); - Deployment parent = parentContext.getDeployment(); - String deploymentID = generateDeploymentID(); - - AtomicInteger deployCount = new AtomicInteger(); - AtomicBoolean failureReported = new AtomicBoolean(); - WorkerPool workerPool = null; - ThreadingModel mode = options.getThreadingModel(); - if (mode == null) { - mode = ThreadingModel.EVENT_LOOP; - } - if (mode != ThreadingModel.VIRTUAL_THREAD) { - if (options.getWorkerPoolName() != null) { - workerPool = vertx.createSharedWorkerPool(options.getWorkerPoolName(), options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()); - } - } else { - if (!vertx.isVirtualThreadAvailable()) { - return callingContext.failedFuture("This Java runtime does not support virtual threads"); - } - } - EventLoop workerLoop = null; - DeploymentImpl deployment = new DeploymentImpl(parent, workerPool, deploymentID, identifier, options); - synchronized (deploying) { - deploying.put(deploymentID, deployment); - } - for (Verticle verticle: verticles) { - CloseFuture closeFuture = new CloseFuture(log); - ContextImpl context; - switch (mode) { - default: - context = vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl); - break; - case WORKER: - if (workerLoop == null) { - context = vertx.createWorkerContext(deployment, closeFuture, workerPool, tccl); - workerLoop = context.nettyEventLoop(); - } else { - context = vertx.createWorkerContext(deployment, closeFuture, workerLoop, workerPool, tccl); - } - break; - case VIRTUAL_THREAD: - if (workerLoop == null) { - context = vertx.createVirtualThreadContext(deployment, closeFuture, tccl); - workerLoop = context.nettyEventLoop(); - } else { - context = vertx.createVirtualThreadContext(deployment, closeFuture, workerLoop, tccl); - } - break; - } - VerticleHolder holder = new VerticleHolder(verticle, context, closeFuture); - Promise startPromise = context.promise(); - holder.startPromise = startPromise; - deployment.addVerticle(holder); - context.runOnContext(v -> { - try { - verticle.init(vertx, context); - Future startFuture = startPromise.future(); - verticle.start(startPromise); - startFuture.onComplete(ar -> { - holder.startPromise = null; - if (ar.succeeded()) { - if (parent != null) { - if (parent.addChild(deployment)) { - deployment.child = true; - } else { - // Orphan - deployment.doUndeploy(vertx.getOrCreateContext()).onComplete(ar2 -> promise.fail("Verticle deployment failed.Could not be added as child of parent verticle")); - return; - } - } - deployments.put(deploymentID, deployment); - if (deployCount.incrementAndGet() == verticles.length) { - synchronized (deploying) { - deploying.remove(deploymentID); - } - promise.complete(deployment); - } - } else if (failureReported.compareAndSet(false, true)) { - deployment.rollback(callingContext, promise, context, holder, ar.cause()); - } - }); - } catch (Throwable t) { - if (failureReported.compareAndSet(false, true)) - deployment.rollback(callingContext, promise, context, holder, t); - } - }); - } - - return promise.future(); - } - - static class VerticleHolder { - - final Verticle verticle; - final ContextImpl context; - final CloseFuture closeFuture; - Promise startPromise; - - VerticleHolder(Verticle verticle, ContextImpl context, CloseFuture closeFuture) { - this.verticle = verticle; - this.context = context; - this.closeFuture = closeFuture; - } - - Future close() { - return closeFuture.close(); - } - } - - private class DeploymentImpl implements Deployment { - - private static final int ST_DEPLOYED = 0, ST_UNDEPLOYING = 1, ST_UNDEPLOYED = 2; - - private final Deployment parent; - private final String deploymentID; - private final JsonObject conf; - private final String verticleIdentifier; - private final List verticles = new CopyOnWriteArrayList<>(); - private final Set children = ConcurrentHashMap.newKeySet(); - private final WorkerPool workerPool; - private final DeploymentOptions options; - private Handler undeployHandler; - private int status = ST_DEPLOYED; - private volatile boolean child; - - private DeploymentImpl(Deployment parent, WorkerPool workerPool, String deploymentID, String verticleIdentifier, DeploymentOptions options) { - this.parent = parent; - this.deploymentID = deploymentID; - this.conf = options.getConfig() != null ? options.getConfig().copy() : new JsonObject(); - this.verticleIdentifier = verticleIdentifier; - this.options = options; - this.workerPool = workerPool; - } - - public void addVerticle(VerticleHolder holder) { - verticles.add(holder); - } - - private synchronized void rollback(ContextInternal callingContext, Promise completionPromise, ContextImpl context, VerticleHolder closeFuture, Throwable cause) { - if (status == ST_DEPLOYED) { - status = ST_UNDEPLOYING; - doUndeployChildren(callingContext).onComplete(childrenResult -> { - if (workerPool != null) { - workerPool.close(); - } - Handler handler; - synchronized (DeploymentImpl.this) { - status = ST_UNDEPLOYED; - handler = undeployHandler; - undeployHandler = null; - } - if (handler != null) { - try { - handler.handle(null); - } catch (Exception e) { - context.reportException(e); - } - } - if (childrenResult.failed()) { - completionPromise.fail(cause); - } else { - closeFuture.close().transform(ar -> Future.failedFuture(cause)).onComplete(completionPromise); - } - }); - } - } - - private synchronized Future doUndeployChildren(ContextInternal undeployingContext) { - if (!children.isEmpty()) { - List> childFuts = new ArrayList<>(); - for (Deployment childDeployment: new HashSet<>(children)) { - Promise p = Promise.promise(); - childFuts.add(p.future()); - childDeployment.doUndeploy(undeployingContext).onComplete(ar -> { - children.remove(childDeployment); - p.handle(ar); - }); - } - return Future.all(childFuts).mapEmpty(); - } else { - return Future.succeededFuture(); - } - } - - public synchronized Future doUndeploy(ContextInternal undeployingContext) { - if (status == ST_UNDEPLOYED) { - return Future.failedFuture(new IllegalStateException("Already undeployed")); - } - if (!children.isEmpty()) { - status = ST_UNDEPLOYING; - return doUndeployChildren(undeployingContext).compose(v -> doUndeploy(undeployingContext)); - } else { - status = ST_UNDEPLOYED; - List> undeployFutures = new ArrayList<>(); - if (parent != null) { - parent.removeChild(this); - } - for (VerticleHolder verticleHolder: verticles) { - ContextImpl context = verticleHolder.context; - Promise p = Promise.promise(); - undeployFutures.add(p.future()); - context.runOnContext(v -> { - Promise stopPromise = undeployingContext.promise(); - Future stopFuture = stopPromise.future(); - stopFuture - .eventually(() -> { - deployments.remove(deploymentID); - return verticleHolder - .close() - .onFailure(err -> log.error("Failed to run close hook", err)); - }).onComplete(p); - try { - verticleHolder.verticle.stop(stopPromise); - } catch (Throwable t) { - if (!stopPromise.tryFail(t)) { - undeployingContext.reportException(t); - } - } - }); - } - Promise resolvingPromise = undeployingContext.promise(); - Future.all(undeployFutures).mapEmpty().onComplete(resolvingPromise); - Future fut = resolvingPromise.future(); - if (workerPool != null) { - fut = fut.andThen(ar -> workerPool.close()); - } - Handler handler = undeployHandler; - if (handler != null) { - undeployHandler = null; - return fut.andThen(ar -> handler.handle(null)); - } - return fut; - } - } - - @Override - public String verticleIdentifier() { - return verticleIdentifier; - } - - @Override - public DeploymentOptions deploymentOptions() { - return options; - } - - @Override - public JsonObject config() { - return conf; - } - - @Override - public synchronized boolean addChild(Deployment deployment) { - if (status == ST_DEPLOYED) { - children.add(deployment); - return true; - } else { - return false; - } - } - - @Override - public void removeChild(Deployment deployment) { - children.remove(deployment); - } - - @Override - public Set getContexts() { - Set contexts = new HashSet<>(); - for (VerticleHolder holder: verticles) { - contexts.add(holder.context); - } - return contexts; - } - - @Override - public Set getVerticles() { - Set verts = new HashSet<>(); - for (VerticleHolder holder: verticles) { - verts.add(holder.verticle); - } - return verts; - } - - @Override - public void undeployHandler(Handler handler) { - synchronized (this) { - if (status != ST_UNDEPLOYED) { - undeployHandler = handler; - return; - } - } - handler.handle(null); - } - - @Override - public boolean isChild() { - return child; - } - - @Override - public String deploymentID() { - return deploymentID; - } - } - -} diff --git a/vertx-core/src/main/java/io/vertx/core/impl/DuplicatedContext.java b/vertx-core/src/main/java/io/vertx/core/impl/DuplicatedContext.java index 59626e4c951..48fc8e6e814 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/DuplicatedContext.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/DuplicatedContext.java @@ -15,6 +15,7 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.ThreadingModel; +import io.vertx.core.impl.deployment.Deployment; import io.vertx.core.internal.CloseFuture; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.EventExecutor; diff --git a/vertx-core/src/main/java/io/vertx/core/impl/HAManager.java b/vertx-core/src/main/java/io/vertx/core/impl/HAManager.java index 8c05d2bc3f5..ffdfa8c937f 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/HAManager.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/HAManager.java @@ -12,6 +12,9 @@ package io.vertx.core.impl; import io.vertx.core.*; +import io.vertx.core.impl.deployment.Deployment; +import io.vertx.core.impl.deployment.DeploymentManager; +import io.vertx.core.impl.verticle.VerticleManager; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.internal.VertxInternal; @@ -433,14 +436,14 @@ private void undeployHADeployments() { if (dep != null) { if (dep.deploymentOptions().isHa()) { ((VertxImpl)vertx).executeIsolated(v -> { - deploymentManager.undeployVerticle(deploymentID).onComplete(result -> { + deploymentManager.undeploy(deploymentID).onComplete(result -> { if (result.succeeded()) { - log.info("Successfully undeployed HA deployment " + deploymentID + "-" + dep.verticleIdentifier() + " as there is no quorum"); - addToHADeployList(dep.verticleIdentifier(), dep.deploymentOptions(), result1 -> { + log.info("Successfully undeployed HA deployment " + deploymentID + "-" + dep.identifier() + " as there is no quorum"); + addToHADeployList(dep.identifier(), dep.deploymentOptions(), result1 -> { if (result1.succeeded()) { - log.info("Successfully redeployed verticle " + dep.verticleIdentifier() + " after quorum was re-attained"); + log.info("Successfully redeployed verticle " + dep.identifier() + " after quorum was re-attained"); } else { - log.error("Failed to redeploy verticle " + dep.verticleIdentifier() + " after quorum was re-attained", result1.cause()); + log.error("Failed to redeploy verticle " + dep.identifier() + " after quorum was re-attained", result1.cause()); } }); } else { diff --git a/vertx-core/src/main/java/io/vertx/core/impl/ShadowContext.java b/vertx-core/src/main/java/io/vertx/core/impl/ShadowContext.java index d2276b5aa5b..989a3dfa77e 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/ShadowContext.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/ShadowContext.java @@ -13,6 +13,7 @@ import io.netty.channel.EventLoop; import io.vertx.codegen.annotations.Nullable; import io.vertx.core.*; +import io.vertx.core.impl.deployment.Deployment; import io.vertx.core.internal.CloseFuture; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.EventExecutor; diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java index 277eec6ae20..60123bf49de 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -33,6 +33,10 @@ import io.vertx.core.file.FileSystem; import io.vertx.core.http.*; import io.vertx.core.http.impl.*; +import io.vertx.core.impl.deployment.DefaultDeploymentManager; +import io.vertx.core.impl.deployment.Deployment; +import io.vertx.core.impl.deployment.DeploymentManager; +import io.vertx.core.impl.verticle.VerticleManager; import io.vertx.core.internal.*; import io.vertx.core.internal.net.NetClientInternal; import io.vertx.core.internal.threadchecker.BlockedThreadChecker; @@ -228,8 +232,8 @@ private static ThreadFactory virtualThreadFactory() { this.nodeSelector = nodeSelector; this.eventBus = clusterManager != null ? new ClusteredEventBus(this, options, clusterManager, nodeSelector) : new EventBusImpl(this); this.sharedData = new SharedDataImpl(this, clusterManager); - this.deploymentManager = new DeploymentManager(this); - this.verticleManager = new VerticleManager(this, deploymentManager); + this.deploymentManager = new DefaultDeploymentManager(this); + this.verticleManager = new VerticleManager(this, DefaultDeploymentManager.log, deploymentManager); this.eventExecutorProvider = eventExecutorProvider; } @@ -820,7 +824,8 @@ private Future deployVerticle(Callable verticleSupplier, Deplo // If we are closed use a context less future return Future.failedFuture("Vert.x closed"); } else { - return deploymentManager.deployVerticle(verticleSupplier, options); + ContextInternal currentContext = getOrCreateContext(); + return verticleManager.deployVerticle2(currentContext, verticleSupplier, options); } } @@ -836,7 +841,7 @@ public Future undeploy(String deploymentID) { } else { future = getOrCreateContext().succeededFuture(); } - return future.compose(v -> deploymentManager.undeployVerticle(deploymentID)); + return future.compose(v -> deploymentManager.undeploy(deploymentID)); } @Override @@ -1121,7 +1126,7 @@ private synchronized WorkerPool createSharedWorkerPool(CloseFuture closeFuture, }); return new WorkerPool(shared.executor(), shared.metrics()) { @Override - void close() { + public void close() { closeFuture.close(); } }; diff --git a/vertx-core/src/main/java/io/vertx/core/impl/WorkerPool.java b/vertx-core/src/main/java/io/vertx/core/impl/WorkerPool.java index 5c191728db7..d2ffa228b5a 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/WorkerPool.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/WorkerPool.java @@ -43,7 +43,7 @@ public PoolMetrics metrics() { return metrics; } - void close() { + public void close() { if (metrics != null) { metrics.close(); } diff --git a/vertx-core/src/main/java/io/vertx/core/impl/deployment/DefaultDeploymentManager.java b/vertx-core/src/main/java/io/vertx/core/impl/deployment/DefaultDeploymentManager.java new file mode 100644 index 00000000000..9162d290d1e --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/impl/deployment/DefaultDeploymentManager.java @@ -0,0 +1,277 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core.impl.deployment; + +import io.vertx.core.*; +import io.vertx.core.impl.VertxImpl; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.logging.Logger; +import io.vertx.core.internal.logging.LoggerFactory; +import io.vertx.core.json.JsonObject; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author Tim Fox + */ +public class DefaultDeploymentManager implements DeploymentManager { + + public static final Logger log = LoggerFactory.getLogger(DefaultDeploymentManager.class); + + private final VertxImpl vertx; + private final Map deploying = new HashMap<>(); + private final Map deployments = new ConcurrentHashMap<>(); + + public DefaultDeploymentManager(VertxImpl vertx) { + this.vertx = vertx; + } + + private String generateDeploymentID() { + return UUID.randomUUID().toString(); + } + + public Future undeploy(String deploymentID) { + Deployment deployment = deployments.get(deploymentID); + ContextInternal currentContext = vertx.getOrCreateContext(); + if (deployment == null) { + return currentContext.failedFuture(new IllegalStateException("Unknown deployment")); + } else { + return deployment.doUndeploy(vertx.getOrCreateContext()); + } + } + + public Set deployments() { + return Collections.unmodifiableSet(deployments.keySet()); + } + + public Deployment getDeployment(String deploymentID) { + return deployments.get(deploymentID); + } + + public Future undeployAll() { + // TODO timeout if it takes too long - e.g. async stop verticle fails to call future + + // We only deploy the top level verticles as the children will be undeployed when the parent is + while (true) { + DeploymentImpl deployment; + synchronized (deploying) { + if (deploying.isEmpty()) { + break; + } + Iterator> it = deploying.entrySet().iterator(); + Map.Entry entry = it.next(); + it.remove(); + deployment = (DeploymentImpl) entry.getValue(); + } + deployment.deployable.undeploy().andThen(ar -> deployments.remove(deployment.deploymentID)); + } + Set deploymentIDs = new HashSet<>(); + for (Map.Entry entry: deployments.entrySet()) { + if (!entry.getValue().isChild()) { + deploymentIDs.add(entry.getKey()); + } + } + List> completionList = new ArrayList<>(); + if (!deploymentIDs.isEmpty()) { + for (String deploymentID : deploymentIDs) { + Promise promise = Promise.promise(); + completionList.add(promise.future()); + undeploy(deploymentID).onComplete(ar -> { + if (ar.failed()) { + // Log but carry on regardless + log.error("Undeploy failed", ar.cause()); + } + promise.handle(ar); + }); + } + Promise promise = vertx.getOrCreateContext().promise(); + Future.join(completionList).mapEmpty().onComplete(promise); + return promise.future(); + } else { + return vertx.getOrCreateContext().succeededFuture(); + } + } + + public Future deploy(DeploymentOptions options, + ContextInternal parentContext, + ContextInternal callingContext, + Deployable deployable) { + String deploymentID = generateDeploymentID(); + Deployment parent = parentContext.getDeployment(); + DeploymentImpl deployment = new DeploymentImpl(deployable, parent, deploymentID, options); + synchronized (deploying) { + deploying.put(deploymentID, deployment); + } + Promise result = callingContext.promise(); + Future f = deployable.deploy(deployment); + f.onComplete(ar -> { + if (ar.succeeded()) { + deployments.put(deploymentID, deployment); + if (parent != null) { + if (parent.addChild(deployment)) { + deployment.child = true; + } else { + // Orphan + deployment + .doUndeploy(vertx.getOrCreateContext()) + .onComplete(ar2 -> { + result.fail("Deployment failed.Could not be added as child of parent deployment"); + }); + return; + } + } + synchronized (deploying) { + deploying.remove(deploymentID); + } + result.complete(deployment); + } else { + deployment + .rollback(callingContext) + .onComplete(ar2 -> result.fail(ar.cause())); + } + }); + return result.future(); + } + + private class DeploymentImpl implements Deployment { + + private static final int ST_DEPLOYED = 0, ST_UNDEPLOYING = 1, ST_UNDEPLOYED = 2; + + private final Deployable deployable; + private final Deployment parent; + private final String deploymentID; + private final JsonObject conf; + private final Set children = ConcurrentHashMap.newKeySet(); + private final DeploymentOptions options; + private int status = ST_DEPLOYED; + private volatile boolean child; + + private DeploymentImpl(Deployable deployable, + Deployment parent, + String deploymentID, + DeploymentOptions options) { + this.deployable = deployable; + this.parent = parent; + this.deploymentID = deploymentID; + this.conf = options.getConfig() != null ? options.getConfig().copy() : new JsonObject(); + this.options = options; + } + + private synchronized Future rollback(ContextInternal callingContext) { + if (status == ST_DEPLOYED) { + status = ST_UNDEPLOYING; + return doUndeployChildren(callingContext) + .transform(childrenResult -> { + synchronized (DeploymentImpl.this) { + status = ST_UNDEPLOYED; + } + if (childrenResult.failed()) { + return (Future)childrenResult; + } else { + return deployable.cleanup(); + } + }); + } else { + return callingContext.succeededFuture(); + } + } + + private synchronized Future doUndeployChildren(ContextInternal undeployingContext) { + if (!children.isEmpty()) { + List> childFuts = new ArrayList<>(); + for (Deployment childDeployment: new HashSet<>(children)) { + childFuts.add(childDeployment + .doUndeploy(undeployingContext) + .andThen(ar -> children.remove(childDeployment))); + } + return Future.all(childFuts); + } else { + return Future.succeededFuture(); + } + } + + public synchronized Future doUndeploy(ContextInternal undeployingContext) { + if (status == ST_UNDEPLOYED) { + return undeployingContext.failedFuture(new IllegalStateException("Already undeployed")); + } + if (!children.isEmpty()) { + status = ST_UNDEPLOYING; + return doUndeployChildren(undeployingContext) + .compose(v -> doUndeploy(undeployingContext)); + } else { + status = ST_UNDEPLOYED; + if (parent != null) { + parent.removeChild(this); + } + Future undeployFutures = deployable + .undeploy() + .andThen(ar -> deployments.remove(deploymentID)) + .eventually(deployable::cleanup); + return undeployingContext.future(p -> { + undeployFutures.onComplete(ar -> { + if (ar.succeeded()) { + p.complete(); + } else { + p.fail(ar.cause()); + } + }); + }); + } + } + + @Override + public String identifier() { + return deployable.identifier(); + } + + @Override + public DeploymentOptions deploymentOptions() { + return options; + } + + @Override + public JsonObject config() { + return conf; + } + + @Override + public synchronized boolean addChild(Deployment deployment) { + if (status == ST_DEPLOYED) { + children.add(deployment); + return true; + } else { + return false; + } + } + + @Override + public void removeChild(Deployment deployment) { + children.remove(deployment); + } + + @Override + public Deployable deployable() { + return deployable; + } + + @Override + public boolean isChild() { + return child; + } + + @Override + public String deploymentID() { + return deploymentID; + } + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployable.java b/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployable.java new file mode 100644 index 00000000000..72022ddc160 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployable.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.impl.deployment; + +import io.vertx.core.Future; +import io.vertx.core.Promise; + +/** + * Something we can deploy. + */ +public interface Deployable { + + String identifier(); + + Future deploy(Deployment deployment); + + Future undeploy(); + + Future cleanup(); + +} diff --git a/vertx-core/src/main/java/io/vertx/core/impl/Deployment.java b/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java similarity index 76% rename from vertx-core/src/main/java/io/vertx/core/impl/Deployment.java rename to vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java index 8783488f83e..d84222f42cc 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/Deployment.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java @@ -9,18 +9,13 @@ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 */ -package io.vertx.core.impl; +package io.vertx.core.impl.deployment; -import io.vertx.core.Context; import io.vertx.core.DeploymentOptions; import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Verticle; import io.vertx.core.internal.ContextInternal; import io.vertx.core.json.JsonObject; -import java.util.Set; - /** * @author Tim Fox */ @@ -36,15 +31,11 @@ public interface Deployment { String deploymentID(); - String verticleIdentifier(); + String identifier(); DeploymentOptions deploymentOptions(); - Set getContexts(); - - Set getVerticles(); - - void undeployHandler(Handler handler); + Deployable deployable(); boolean isChild(); diff --git a/vertx-core/src/main/java/io/vertx/core/impl/deployment/DeploymentManager.java b/vertx-core/src/main/java/io/vertx/core/impl/deployment/DeploymentManager.java new file mode 100644 index 00000000000..40a3368116c --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/impl/deployment/DeploymentManager.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core.impl.deployment; + +import io.vertx.core.*; +import io.vertx.core.internal.ContextInternal; + +import java.util.*; + +/** + * @author Tim Fox + */ +public interface DeploymentManager { + + Future deploy(DeploymentOptions options, + ContextInternal parentContext, + ContextInternal callingContext, + Deployable verticleDeployable); + + Future undeploy(String deploymentID); + + Set deployments(); + + Deployment getDeployment(String deploymentID); + + Future undeployAll(); + + +} diff --git a/vertx-core/src/main/java/io/vertx/core/impl/JavaVerticleFactory.java b/vertx-core/src/main/java/io/vertx/core/impl/verticle/JavaVerticleFactory.java similarity index 94% rename from vertx-core/src/main/java/io/vertx/core/impl/JavaVerticleFactory.java rename to vertx-core/src/main/java/io/vertx/core/impl/verticle/JavaVerticleFactory.java index a849a35a51e..db9f68b8642 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/JavaVerticleFactory.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/verticle/JavaVerticleFactory.java @@ -9,11 +9,10 @@ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 */ -package io.vertx.core.impl; +package io.vertx.core.impl.verticle; import io.vertx.core.Promise; import io.vertx.core.Verticle; -import io.vertx.core.impl.verticle.CompilingClassLoader; import io.vertx.core.spi.VerticleFactory; import java.util.concurrent.Callable; diff --git a/vertx-core/src/main/java/io/vertx/core/impl/verticle/VerticleDeployable.java b/vertx-core/src/main/java/io/vertx/core/impl/verticle/VerticleDeployable.java new file mode 100644 index 00000000000..3c0248a1639 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/impl/verticle/VerticleDeployable.java @@ -0,0 +1,236 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.impl.verticle; + +import io.netty.channel.EventLoop; +import io.vertx.core.*; +import io.vertx.core.impl.*; +import io.vertx.core.impl.deployment.Deployable; +import io.vertx.core.impl.deployment.Deployment; +import io.vertx.core.internal.CloseFuture; +import io.vertx.core.internal.logging.Logger; + +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Function; + +public class VerticleDeployable implements Deployable { + + public static Deployable deployable(VertxImpl vertx, + Logger log, + DeploymentOptions options, + Function identifierProvider, + ClassLoader tccl, + Callable verticleSupplier) throws Exception { + int nbInstances = options.getInstances(); + Set verticles = Collections.newSetFromMap(new IdentityHashMap<>()); + for (int i = 0; i < nbInstances; i++) { + Verticle verticle; + try { + verticle = verticleSupplier.call(); + } catch (Exception e) { + throw e; + } + if (verticle == null) { + throw new VertxException("Supplied verticle is null", true); + } + verticles.add(verticle); + } + if (verticles.size() != nbInstances) { + throw new VertxException("Same verticle supplied more than once", true); + } + WorkerPool workerPool = null; + ThreadingModel mode = options.getThreadingModel(); + if (mode == null) { + mode = ThreadingModel.EVENT_LOOP; + } + if (mode != ThreadingModel.VIRTUAL_THREAD) { + if (options.getWorkerPoolName() != null) { + workerPool = vertx.createSharedWorkerPool(options.getWorkerPoolName(), options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()); + } + } else { + if (!vertx.isVirtualThreadAvailable()) { + throw new VertxException("This Java runtime does not support virtual threads", true); + } + } + ArrayList list = new ArrayList<>(verticles); + return new VerticleDeployable(vertx, log, list, identifierProvider.apply(list.get(0)), mode, workerPool, tccl); + } + + private final VertxImpl vertx; + private final Logger log; + private final List verticles; + private final ThreadingModel threading; + private final WorkerPool workerPool; + private final String identifier; + private final List holders = new CopyOnWriteArrayList<>(); + private final ClassLoader tccl; + + public VerticleDeployable(VertxImpl vertx, + Logger log, + List verticles, + String identifier, + ThreadingModel threading, + WorkerPool workerPool, + ClassLoader tccl) { + this.vertx = vertx; + this.log = log; + this.workerPool = workerPool; + this.verticles = verticles; + this.identifier = identifier; + this.threading = threading; + this.tccl = tccl; + } + + public Set contexts() { + Set contexts = new HashSet<>(); + for (VerticleHolder holder: holders) { + contexts.add(holder.context); + } + return contexts; + } + + public Set verticles() { + Set verts = new HashSet<>(); + for (VerticleHolder holder: holders) { + verts.add(holder.verticle); + } + return verts; + } + + @Override + public String identifier() { + return identifier; + } + + @Override + public Future deploy(Deployment deployment) { + EventLoop workerLoop = null; + List> futures = new ArrayList<>(); + for (Verticle verticle: verticles) { + CloseFuture closeFuture = new CloseFuture(log); + ContextImpl context; + switch (threading) { + case WORKER: + if (workerLoop == null) { + context = vertx.createWorkerContext(deployment, closeFuture, workerPool, tccl); + workerLoop = context.nettyEventLoop(); + } else { + context = vertx.createWorkerContext(deployment, closeFuture, workerLoop, workerPool, tccl); + } + break; + case VIRTUAL_THREAD: + if (workerLoop == null) { + context = vertx.createVirtualThreadContext(deployment, closeFuture, tccl); + workerLoop = context.nettyEventLoop(); + } else { + context = vertx.createVirtualThreadContext(deployment, closeFuture, workerLoop, tccl); + } + break; + default: + context = vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl); + break; + } + VerticleHolder holder = new VerticleHolder(verticle, context, closeFuture); + Promise startPromise = context.promise(); + holder.startPromise = startPromise; + holders.add(holder); + futures.add(startPromise + .future() + .andThen(ar -> { + if (ar.succeeded()) { + holder.startPromise = null; + } + })); + context.runOnContext(v -> { + try { + verticle.init(vertx, context); + verticle.start(startPromise); + } catch (Throwable t) { + startPromise.tryFail(t); + } + }); + } + return Future + .join(futures) + .transform(ar -> { + if (ar.failed()) { + return undeploy().transform(ar2 -> (Future) ar); + } else { + return Future.succeededFuture(); + } + }); + } + + @Override + public Future undeploy() { + List> undeployFutures = new ArrayList<>(); + for (VerticleHolder holder: holders) { + Promise startPromise = holder.startPromise; + if (startPromise != null) { + startPromise.tryFail(new VertxException("Verticle un-deployed", true)); + } else { + ContextImpl context = holder.context; + Promise p = Promise.promise(); + undeployFutures.add(p.future()); + context.runOnContext(v -> { + Promise stopPromise = Promise.promise(); + Future stopFuture = stopPromise.future(); + stopFuture + .eventually(() -> holder + .close() + .onFailure(err -> log.error("Failed to run close hook", err))).onComplete(p); + try { + holder.verticle.stop(stopPromise); + } catch (Throwable t) { + if (!stopPromise.tryFail(t)) { + context.reportException(t); + } + } + }); + } + } + return Future.join(undeployFutures); + } + + @Override + public Future cleanup() { + List> futs = new ArrayList<>(); + for (VerticleHolder holder : holders) { + futs.add(holder.closeFuture.close()); + } + Future fut = Future.join(futs); + if (workerPool != null) { + fut = fut.andThen(ar -> workerPool.close()); + workerPool.close(); + } + return fut; + } + + private static class VerticleHolder { + + final Verticle verticle; + final ContextImpl context; + final CloseFuture closeFuture; + Promise startPromise; + + VerticleHolder(Verticle verticle, ContextImpl context, CloseFuture closeFuture) { + this.verticle = verticle; + this.context = context; + this.closeFuture = closeFuture; + } + + Future close() { + return closeFuture.close(); + } + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VerticleManager.java b/vertx-core/src/main/java/io/vertx/core/impl/verticle/VerticleManager.java similarity index 77% rename from vertx-core/src/main/java/io/vertx/core/impl/VerticleManager.java rename to vertx-core/src/main/java/io/vertx/core/impl/verticle/VerticleManager.java index ce6ae245cae..90e8309d0ee 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VerticleManager.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/verticle/VerticleManager.java @@ -8,15 +8,20 @@ * * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 */ -package io.vertx.core.impl; +package io.vertx.core.impl.verticle; import io.vertx.core.DeploymentOptions; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.ServiceHelper; import io.vertx.core.Verticle; +import io.vertx.core.impl.*; +import io.vertx.core.impl.deployment.Deployable; +import io.vertx.core.impl.deployment.Deployment; +import io.vertx.core.impl.deployment.DeploymentManager; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.VertxInternal; +import io.vertx.core.internal.logging.Logger; import io.vertx.core.spi.VerticleFactory; import java.util.ArrayList; @@ -28,20 +33,23 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; /** * @author Tim Fox */ public class VerticleManager { - private final VertxInternal vertx; + private final Logger log; + private final VertxImpl vertx; private final DeploymentManager deploymentManager; private final Map> verticleFactories = new ConcurrentHashMap<>(); private final List defaultFactories = new ArrayList<>(); - public VerticleManager(VertxInternal vertx, DeploymentManager deploymentManager) { - this.vertx = vertx; + public VerticleManager(VertxInternal vertx, Logger log, DeploymentManager deploymentManager) { + this.vertx = (VertxImpl) vertx; this.deploymentManager = deploymentManager; + this.log = log; loadVerticleFactories(); } @@ -144,7 +152,7 @@ private static String getSuffix(int pos, String str) { } public Future deployVerticle(String identifier, - DeploymentOptions options) { + DeploymentOptions options) { ContextInternal callingContext = vertx.getOrCreateContext(); ClassLoader loader = options.getClassLoader(); if (loader == null) { @@ -202,7 +210,7 @@ private Future doDeployVerticle(VerticleFactory verticleFactory, return Future.failedFuture(e); } return p.future() - .compose(callable -> deploymentManager.doDeploy(options, v -> identifier, parentContext, callingContext, cl, callable)); + .compose(callable -> deployVerticle(options, v -> identifier, parentContext, callingContext, cl, callable)); } static ClassLoader getCurrentClassLoader() { @@ -212,4 +220,43 @@ static ClassLoader getCurrentClassLoader() { } return cl; } + + public Future deployVerticle2(ContextInternal parentContext, Callable verticleSupplier, DeploymentOptions options) { + if (options.getInstances() < 1) { + throw new IllegalArgumentException("Can't specify < 1 instances to deploy"); + } + options.checkIsolationNotDefined(); + ClassLoader cl = options.getClassLoader(); + if (cl == null) { + cl = Thread.currentThread().getContextClassLoader(); + if (cl == null) { + cl = getClass().getClassLoader(); + } + } + return deployVerticle( + options, + v -> "java:" + v.getClass().getName(), + parentContext, + parentContext, + cl, + verticleSupplier) + .map(Deployment::deploymentID); + } + public Future deployVerticle(DeploymentOptions options, + Function identifierProvider, + ContextInternal parentContext, + ContextInternal callingContext, + ClassLoader tccl, + Callable verticleSupplier) { + + + Deployable verticleDeployable; + try { + verticleDeployable = VerticleDeployable.deployable(vertx, log, options, identifierProvider, tccl, verticleSupplier); + } catch (Exception e) { + return callingContext.failedFuture(e); + } + + return deploymentManager.deploy(options, parentContext, callingContext, verticleDeployable); + } } diff --git a/vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java b/vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java index 498475bf859..667223563a3 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java @@ -15,6 +15,7 @@ import io.vertx.core.*; import io.vertx.core.Future; import io.vertx.core.impl.*; +import io.vertx.core.impl.deployment.Deployment; import io.vertx.core.impl.future.FailedFuture; import io.vertx.core.impl.future.PromiseImpl; import io.vertx.core.impl.future.SucceededFuture; diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java index afe6cfd7a22..4f11c789c5b 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java @@ -18,6 +18,7 @@ import io.vertx.core.*; import io.vertx.core.dns.impl.DnsAddressResolverProvider; import io.vertx.core.impl.*; +import io.vertx.core.impl.deployment.Deployment; import io.vertx.core.internal.threadchecker.BlockedThreadChecker; import io.vertx.core.net.NetServerOptions; import io.vertx.core.net.impl.NetServerInternal; diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java index 4b2bf70a70a..c598dc61fb7 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java @@ -23,6 +23,7 @@ import io.vertx.core.file.FileSystem; import io.vertx.core.http.*; import io.vertx.core.impl.*; +import io.vertx.core.impl.deployment.Deployment; import io.vertx.core.internal.threadchecker.BlockedThreadChecker; import io.vertx.core.net.NetClient; import io.vertx.core.net.NetClientOptions; diff --git a/vertx-core/src/main/java/module-info.java b/vertx-core/src/main/java/module-info.java index 53f3393ae3f..7aaac23f931 100644 --- a/vertx-core/src/main/java/module-info.java +++ b/vertx-core/src/main/java/module-info.java @@ -113,5 +113,7 @@ exports io.vertx.core.impl.transports to io.vertx.tests; exports io.vertx.core.net.impl.pkcs1 to io.vertx.tests; exports io.vertx.core.spi.cluster.impl.selector to io.vertx.tests; + exports io.vertx.core.impl.verticle to io.vertx.tests; + exports io.vertx.core.impl.deployment to io.vertx.tests; } diff --git a/vertx-core/src/test/java/io/vertx/test/core/AsyncTestBase.java b/vertx-core/src/test/java/io/vertx/test/core/AsyncTestBase.java index aa2126821e4..13bdaabb50f 100644 --- a/vertx-core/src/test/java/io/vertx/test/core/AsyncTestBase.java +++ b/vertx-core/src/test/java/io/vertx/test/core/AsyncTestBase.java @@ -51,7 +51,7 @@ public class AsyncTestBase { private boolean threadChecksEnabled = true; private volatile boolean tearingDown; private volatile Thread mainThread; - private volatile boolean lateFailure; + private volatile Throwable lateFailure; private Map threadNames = new ConcurrentHashMap<>(); @Rule public TestName name = new TestName(); @@ -66,7 +66,7 @@ protected void setUp() throws Exception { testCompleteCalled = false; awaitCalled = false; threadNames.clear(); - lateFailure = false; + lateFailure = null; } protected void tearDown() throws Exception { @@ -163,7 +163,9 @@ protected void afterAsyncTestBase() { // Throwable caught from non main thread throw new IllegalStateException("Assert or failure from non main thread but no await() on main thread", throwable); } - if (lateFailure) { + Throwable late = lateFailure; + if (late != null) { + late.printStackTrace(System.out); throw new IllegalStateException("Test reported a failure after completion"); } for (Map.Entry entry: threadNames.entrySet()) { @@ -180,7 +182,7 @@ protected void afterAsyncTestBase() { private void handleThrowable(Throwable t) { if (Thread.currentThread() != mainThread && testCompleteCalled) { - lateFailure = true; + lateFailure = t; throw new IllegalStateException("assert or failure occurred after test has completed", t); } throwable = t; diff --git a/vertx-core/src/test/java/io/vertx/tests/deployment/DeploymentTest.java b/vertx-core/src/test/java/io/vertx/tests/deployment/DeploymentTest.java index 1c4ac595578..3aeda5d75bd 100644 --- a/vertx-core/src/test/java/io/vertx/tests/deployment/DeploymentTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/deployment/DeploymentTest.java @@ -13,10 +13,13 @@ import io.netty.channel.EventLoop; import io.vertx.core.*; +import io.vertx.core.impl.verticle.VerticleDeployable; +import io.vertx.core.internal.CloseFuture; import io.vertx.core.internal.ContextInternal; -import io.vertx.core.impl.Deployment; +import io.vertx.core.impl.deployment.Deployment; import io.vertx.core.internal.VertxInternal; import io.vertx.core.json.JsonObject; +import io.vertx.test.core.Repeat; import io.vertx.test.core.TestUtils; import io.vertx.test.core.VertxTestBase; import org.junit.Ignore; @@ -535,7 +538,7 @@ public void testDeployUndeployMultipleInstancesUsingClassName() throws Exception assertWaitUntil(() -> deployCount.get() == numInstances); assertEquals(1, vertx.deploymentIDs().size()); Deployment deployment = ((VertxInternal) vertx).getDeployment(vertx.deploymentIDs().iterator().next()); - Set verticles = deployment.getVerticles(); + Set verticles = ((VerticleDeployable)deployment.deployable()).verticles(); assertEquals(numInstances, verticles.size()); CountDownLatch undeployLatch = new CountDownLatch(1); assertEquals(numInstances, deployCount.get()); @@ -839,7 +842,6 @@ public void testAsyncUndeployFailure() throws Exception { @Test public void testAsyncUndeployFailsAfterSuccess() { - waitFor(2); Verticle verticle = new AbstractVerticle() { @Override public void stop(Promise stopPromise) throws Exception { @@ -850,9 +852,6 @@ public void stop(Promise stopPromise) throws Exception { Context ctx = vertx.getOrCreateContext(); ctx.runOnContext(v1 -> { vertx.deployVerticle(verticle).onComplete(onSuccess(id -> { - ctx.exceptionHandler(err -> { - complete(); - }); vertx.undeploy(id).onComplete(onSuccess(v2 -> { complete(); })); @@ -1061,23 +1060,16 @@ public void start() { public void testGetInstanceCountMultipleVerticles() throws Exception { AtomicInteger messageCount = new AtomicInteger(0); AtomicInteger totalReportedInstances = new AtomicInteger(0); - vertx.eventBus().consumer("instanceCount", event -> { - messageCount.incrementAndGet(); totalReportedInstances.addAndGet((int)event.body()); - if(messageCount.intValue() == 3) { - assertEquals(9, totalReportedInstances.get()); - testComplete(); - } + messageCount.incrementAndGet(); }); - - vertx.deployVerticle(TestVerticle3.class.getCanonicalName(), new DeploymentOptions().setInstances(3)) - .onComplete(onSuccess(v -> {})); - await(); + awaitFuture(vertx.deployVerticle(TestVerticle3.class.getCanonicalName(), new DeploymentOptions().setInstances(3))); + assertWaitUntil(() -> messageCount.get() == 3); + assertEquals(9, totalReportedInstances.get()); + assertWaitUntil(() -> vertx.deploymentIDs().size() == 1); Deployment deployment = ((VertxInternal) vertx).getDeployment(vertx.deploymentIDs().iterator().next()); - CountDownLatch latch = new CountDownLatch(1); - vertx.undeploy(deployment.deploymentID()).onComplete(ar -> latch.countDown()); - awaitLatch(latch); + awaitFuture(vertx.undeploy(deployment.deploymentID())); } @Test @@ -1095,7 +1087,7 @@ public void stop() { }; Verticle verticleParent = new AbstractVerticle() { @Override - public void start(Promise startPromise) throws Exception { + public void start(Promise startPromise) { vertx.deployVerticle(verticleChild).onComplete(onFailure(v -> { startPromise.complete(); })); @@ -1299,6 +1291,56 @@ public void testUndeployParentDuringChildDeployment() throws Exception { await(); } + @Test + public void testDeployWithPartialFailure() { + testDeployWithPartialFailure(3, 2); + } + + private void testDeployWithPartialFailure(int numberOfInstances, int instanceToFail) { + AtomicInteger count = new AtomicInteger(); + Map> startPromises = Collections.synchronizedMap(new HashMap<>()); + Map stopped = Collections.synchronizedMap(new HashMap<>()); + Set closeHooks = Collections.synchronizedSet(new HashSet<>()); + Future fut = vertx.deployVerticle(() -> { + int idx = count.getAndIncrement(); + return new AbstractVerticle() { + @Override + public void start(Promise startPromise) { + ContextInternal ctx = (ContextInternal) context; + ctx.addCloseHook(completion -> { + closeHooks.add(idx); + completion.complete(); + }); + startPromises.put(idx, startPromise); + if (startPromises.size() == numberOfInstances) { + startPromises + .forEach((idx, p) -> { + if (idx != instanceToFail) { + p.tryComplete(); + } else { + p.tryFail("it-failed"); + } + }); + } + } + @Override + public void stop() { + stopped.put(idx, true); + } + }; + }, new DeploymentOptions().setInstances(numberOfInstances)); + fut.onComplete(onFailure(expected -> { + for (int j = 0;j < numberOfInstances;j++) { + if (instanceToFail != j) { + assertTrue(stopped.containsKey(j)); + } + assertTrue(closeHooks.contains(j)); + } + testComplete(); + })); + await(); + } + @Test public void testCloseDeploymentInProgress() { Vertx vertx = Vertx.vertx(); diff --git a/vertx-core/src/test/java/io/vertx/tests/ha/ComplexHATest.java b/vertx-core/src/test/java/io/vertx/tests/ha/ComplexHATest.java index b1bb09b39fc..78c429c6ac4 100644 --- a/vertx-core/src/test/java/io/vertx/tests/ha/ComplexHATest.java +++ b/vertx-core/src/test/java/io/vertx/tests/ha/ComplexHATest.java @@ -15,7 +15,7 @@ import io.vertx.core.DeploymentOptions; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; -import io.vertx.core.impl.Deployment; +import io.vertx.core.impl.deployment.Deployment; import io.vertx.core.internal.VertxInternal; import io.vertx.core.json.JsonObject; import io.vertx.core.spi.cluster.ClusterManager; @@ -230,7 +230,7 @@ protected int checkHasDeployments(int pos, int prevPos) { for (Deployment prev: prevSet) { boolean contains = false; for (Deployment curr: currSet) { - if (curr.verticleIdentifier().equals(prev.verticleIdentifier()) && curr.deploymentOptions().toJson().equals(prev.deploymentOptions().toJson())) { + if (curr.identifier().equals(prev.identifier()) && curr.deploymentOptions().toJson().equals(prev.deploymentOptions().toJson())) { contains = true; break; } diff --git a/vertx-core/src/test/java/io/vertx/tests/ha/HATest.java b/vertx-core/src/test/java/io/vertx/tests/ha/HATest.java index 1d07249239d..aa634e9fafa 100644 --- a/vertx-core/src/test/java/io/vertx/tests/ha/HATest.java +++ b/vertx-core/src/test/java/io/vertx/tests/ha/HATest.java @@ -14,7 +14,7 @@ import io.vertx.core.DeploymentOptions; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; -import io.vertx.core.impl.Deployment; +import io.vertx.core.impl.deployment.Deployment; import io.vertx.core.internal.VertxInternal; import io.vertx.core.json.JsonObject; import io.vertx.core.spi.cluster.ClusterManager; @@ -266,7 +266,7 @@ public void testNonHADeployments() throws Exception { assertTrue(vertx1.deploymentIDs().size() == 1); String depID = vertx1.deploymentIDs().iterator().next(); - assertTrue(((VertxInternal) vertx1).getDeployment(depID).verticleIdentifier().equals("java:" + HAVerticle1.class.getName())); + assertTrue(((VertxInternal) vertx1).getDeployment(depID).identifier().equals("java:" + HAVerticle1.class.getName())); } @Test @@ -294,7 +294,7 @@ public void testCloseRemovesFromCluster() throws Exception { assertTrue(vertx1.deploymentIDs().size() == 1); String depID = vertx1.deploymentIDs().iterator().next(); - assertTrue(((VertxInternal) vertx1).getDeployment(depID).verticleIdentifier().equals("java:" + HAVerticle1.class.getName())); + assertTrue(((VertxInternal) vertx1).getDeployment(depID).identifier().equals("java:" + HAVerticle1.class.getName())); } @Test @@ -390,7 +390,7 @@ protected void checkDeploymentExists(int pos, String verticleName, DeploymentOpt VertxInternal vi = (VertxInternal)vertices[pos]; for (String deploymentID: vi.deploymentIDs()) { Deployment dep = vi.getDeployment(deploymentID); - if (verticleName.equals(dep.verticleIdentifier()) && options.toJson().equals(dep.deploymentOptions().toJson())) { + if (verticleName.equals(dep.identifier()) && options.toJson().equals(dep.deploymentOptions().toJson())) { return; } }