diff --git a/src/main/java/io/cryostat/discovery/DiscoveryModule.java b/src/main/java/io/cryostat/discovery/DiscoveryModule.java index 830ffe1697..dfc6289cf8 100644 --- a/src/main/java/io/cryostat/discovery/DiscoveryModule.java +++ b/src/main/java/io/cryostat/discovery/DiscoveryModule.java @@ -27,6 +27,7 @@ import io.cryostat.configuration.CredentialsManager; import io.cryostat.configuration.Variables; import io.cryostat.core.log.Logger; +import io.cryostat.core.sys.Clock; import io.cryostat.core.sys.Environment; import io.cryostat.messaging.notifications.NotificationFactory; import io.cryostat.platform.PlatformModule; @@ -77,9 +78,11 @@ static DiscoveryStorage provideDiscoveryStorage( Lazy matchExpressionEvaluator, Gson gson, WebClient http, + Clock clock, Logger logger) { return new DiscoveryStorage( deployer, + Executors.newSingleThreadScheduledExecutor(), Executors.newCachedThreadPool(), pingPeriod, builtin, @@ -89,6 +92,7 @@ static DiscoveryStorage provideDiscoveryStorage( matchExpressionEvaluator, gson, http, + clock, logger); } diff --git a/src/main/java/io/cryostat/discovery/DiscoveryStorage.java b/src/main/java/io/cryostat/discovery/DiscoveryStorage.java index 95a28efee2..a32a6a772c 100644 --- a/src/main/java/io/cryostat/discovery/DiscoveryStorage.java +++ b/src/main/java/io/cryostat/discovery/DiscoveryStorage.java @@ -28,7 +28,11 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import javax.script.ScriptException; @@ -38,6 +42,7 @@ import io.cryostat.configuration.StoredCredentials; import io.cryostat.core.log.Logger; import io.cryostat.core.net.discovery.JvmDiscoveryClient.EventKind; +import io.cryostat.core.sys.Clock; import io.cryostat.platform.ServiceRef; import io.cryostat.platform.ServiceRef.AnnotationKey; import io.cryostat.platform.discovery.AbstractNode; @@ -52,23 +57,22 @@ import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import dagger.Lazy; -import io.vertx.core.CompositeFuture; -import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; import io.vertx.ext.auth.authentication.UsernamePasswordCredentials; import io.vertx.ext.web.client.HttpRequest; -import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.Pair; public class DiscoveryStorage extends AbstractPlatformClientVerticle { public static final URI NO_CALLBACK = null; private final Duration pingPeriod; private final VerticleDeployer deployer; + private final ScheduledExecutorService scheduler; private final ExecutorService executor; private final Lazy builtin; private final PluginInfoDao dao; @@ -76,17 +80,20 @@ public class DiscoveryStorage extends AbstractPlatformClientVerticle { private final Lazy credentialsManager; private final Lazy matchExpressionEvaluator; private final Gson gson; + private final Clock clock; private final WebClient http; private final Logger logger; - private long pluginPruneTimerId = -1L; - private long targetRetryTimeId = -1L; + private ScheduledFuture pluginPruneTask; + private ScheduledFuture targetRetryTask; - private final Map nonConnectableTargets = new HashMap<>(); + private final Map, ConnectionAttemptRecord> nonConnectableTargets = + new ConcurrentHashMap<>(); public static final String DISCOVERY_STARTUP_ADDRESS = "discovery-startup"; DiscoveryStorage( VerticleDeployer deployer, + ScheduledExecutorService scheduler, ExecutorService executor, Duration pingPeriod, Lazy builtin, @@ -96,8 +103,10 @@ public class DiscoveryStorage extends AbstractPlatformClientVerticle { Lazy matchExpressionEvaluator, Gson gson, WebClient http, + Clock clock, Logger logger) { this.deployer = deployer; + this.scheduler = scheduler; this.executor = executor; this.pingPeriod = pingPeriod; this.builtin = builtin; @@ -107,55 +116,41 @@ public class DiscoveryStorage extends AbstractPlatformClientVerticle { this.matchExpressionEvaluator = matchExpressionEvaluator; this.gson = gson; this.http = http; + this.clock = clock; this.logger = logger; } @Override public void start(Promise future) throws Exception { pingPrune() - .onSuccess( - cf -> - deployer.deploy(builtin.get(), true) - .onSuccess(ar -> future.complete()) - .onFailure(t -> future.fail((Throwable) t)) - .eventually( - m -> - getVertx() - .eventBus() - .send( - DISCOVERY_STARTUP_ADDRESS, - "Discovery storage" - + " deployed"))) - .onFailure(future::fail); - - this.pluginPruneTimerId = getVertx().setPeriodic(pingPeriod.toMillis(), i -> pingPrune()); - this.targetRetryTimeId = - getVertx() - .setPeriodic( - // TODO make this configurable, use an exponential backoff, have a - // maximum retry policy, etc. - 15_000, - i -> { - testNonConnectedTargets( - entry -> { - TargetNode targetNode = entry.getKey(); - try { - String id = - jvmIdHelper - .get() - .resolveId( - targetNode.getTarget()) - .getJvmId(); - return StringUtils.isNotBlank(id); - } catch (JvmIdGetException e) { - logger.info( - "Retain null jvmId for node [{}]", - targetNode.getName()); - logger.info(e); - return false; - } - }); - }); + .whenComplete( + (v, ex) -> { + if (ex != null) { + future.fail(ex); + return; + } + deployer.deploy(builtin.get(), true) + .onSuccess(ar -> future.complete()) + .onFailure(t -> future.fail((Throwable) t)) + .eventually( + m -> + getVertx() + .eventBus() + .send( + DISCOVERY_STARTUP_ADDRESS, + "Discovery storage deployed")); + }); + + this.pluginPruneTask = + scheduler.scheduleAtFixedRate( + this::pingPrune, + pingPeriod.toMillis(), + pingPeriod.toMillis(), + TimeUnit.MILLISECONDS); + // TODO make this configurable + this.targetRetryTask = + scheduler.scheduleAtFixedRate( + this::checkNonConnectedTargetJvmIds, 2, 2, TimeUnit.SECONDS); this.credentialsManager .get() .addListener( @@ -165,11 +160,14 @@ public void start(Promise future) throws Exception { testNonConnectedTargets( entry -> { try { - return matchExpressionEvaluator - .get() - .applies( - event.getPayload(), - entry.getKey().getTarget()); + ServiceRef target = entry.getKey().getTarget(); + boolean credentialsApply = + matchExpressionEvaluator + .get() + .applies( + event.getPayload(), + target); + return credentialsApply && testJvmId(target); } catch (ScriptException e) { logger.error(e); return false; @@ -183,17 +181,79 @@ public void start(Promise future) throws Exception { event.getEventType().toString()); } }); + + this.addTargetDiscoveryListener( + tde -> { + switch (tde.getEventKind()) { + case MODIFIED: + testNonConnectedTargets( + entry -> + Objects.equals( + tde.getServiceRef(), + entry.getKey().getTarget())); + break; + case LOST: + var it = nonConnectableTargets.entrySet().iterator(); + while (it.hasNext()) { + var entry = it.next(); + if (Objects.equals( + tde.getServiceRef(), entry.getKey().getKey().getTarget())) { + it.remove(); + } + } + break; + default: + break; + } + }); + } + + private void checkNonConnectedTargetJvmIds() { + testNonConnectedTargets( + entry -> { + TargetNode targetNode = entry.getKey(); + ConnectionAttemptRecord attemptRecord = nonConnectableTargets.get(entry); + // TODO make this configurable, use an exponential backoff, have a + // maximum retry policy, etc. + long nextAttempt = + (attemptRecord.attemptCount * attemptRecord.attemptCount) + + attemptRecord.lastAttemptTimestamp; + attemptRecord.attemptCount++; + long now = clock.now().getEpochSecond(); + if (now < nextAttempt) { + return false; + } + long elapsed = + attemptRecord.lastAttemptTimestamp + - attemptRecord.firstAttemptTimestamp; + if (elapsed > ConnectionAttemptRecord.MAX_ATTEMPT_INTERVAL) { + return false; + } + return testJvmId(targetNode.getTarget()); + }); + } + + private boolean testJvmId(ServiceRef serviceRef) { + try { + String id = jvmIdHelper.get().resolveId(serviceRef).getJvmId(); + return StringUtils.isNotBlank(id); + } catch (JvmIdGetException e) { + logger.trace("Retain null jvmId for target [{}]", serviceRef.getServiceUri()); + logger.trace(e); + return false; + } } private void testNonConnectedTargets(Predicate> predicate) { - executor.execute( - () -> { - Map copy = new HashMap<>(nonConnectableTargets); - for (var entry : copy.entrySet()) { + Map, ConnectionAttemptRecord> copy = + new HashMap<>(nonConnectableTargets); + for (var entry : copy.entrySet()) { + executor.submit( + () -> { try { - if (predicate.test(entry)) { + if (predicate.test(entry.getKey())) { nonConnectableTargets.remove(entry.getKey()); - UUID id = entry.getValue(); + UUID id = entry.getKey().getValue(); PluginInfo plugin = getById(id).orElseThrow(); EnvironmentNode original = gson.fromJson(plugin.getSubtree(), EnvironmentNode.class); @@ -202,39 +262,42 @@ private void testNonConnectedTargets(Predicate> predicat } catch (JsonSyntaxException e) { throw new RuntimeException(e); } - } - }); + }); + } } @Override public void stop() { - getVertx().cancelTimer(pluginPruneTimerId); - getVertx().cancelTimer(targetRetryTimeId); + if (this.pluginPruneTask != null) { + this.pluginPruneTask.cancel(false); + } + if (this.targetRetryTask != null) { + this.targetRetryTask.cancel(false); + } } - private CompositeFuture pingPrune() { - List futures = + private CompletableFuture pingPrune() { + List> futures = dao.getAll().stream() .map( plugin -> { UUID key = plugin.getId(); URI uri = plugin.getCallback(); - return (Future) - ping(HttpMethod.POST, uri) - .onSuccess( - res -> { - if (!Boolean.TRUE.equals(res)) { - removePlugin(key, uri); - } - }); + return ping(HttpMethod.POST, uri) + .whenComplete( + (v, t) -> { + if (t != null || !Boolean.TRUE.equals(v)) { + removePlugin(key, uri); + } + }); }) .toList(); - return CompositeFuture.join(futures); + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } - private Future ping(HttpMethod mtd, URI uri) { + private CompletableFuture ping(HttpMethod mtd, URI uri) { if (Objects.equals(uri, NO_CALLBACK)) { - return Future.succeededFuture(true); + return CompletableFuture.completedFuture(true); } HttpRequest req = http.request(mtd, uri.getPort(), uri.getHost(), uri.getPath()) @@ -253,27 +316,34 @@ private Future ping(HttpMethod mtd, URI uri) { credentials.getCredentials().getUsername(), credentials.getCredentials().getPassword())); } - return req.send() - .onComplete( - ar -> { - if (ar.failed()) { - logger.info( - "{} {} failed: {}", - mtd, - uri, - ExceptionUtils.getStackTrace(ar.cause())); - return; - } - logger.info( - "{} {} status {}: {}", - mtd, - uri, - ar.result().statusCode(), - ar.result().statusMessage()); - }) - .map(HttpResponse::statusCode) - .map(HttpStatusCodeIdentifier::isSuccessCode) - .otherwise(false); + final HttpRequest freq = req; + CompletableFuture result = new CompletableFuture<>(); + executor.submit( + () -> { + freq.send() + .onComplete( + ar -> { + if (ar.failed()) { + logger.info( + "{} {} failed: {}", + mtd, + uri, + ExceptionUtils.getStackTrace(ar.cause())); + result.completeExceptionally(ar.cause()); + return; + } + logger.info( + "{} {} status {}: {}", + mtd, + uri, + ar.result().statusCode(), + ar.result().statusMessage()); + result.complete( + HttpStatusCodeIdentifier.isSuccessCode( + ar.result().statusCode())); + }); + }); + return result; } private Optional getStoredCredentials(URI uri) { @@ -312,9 +382,7 @@ public UUID register(String realm, URI callback) throws RegistrationException { // FIXME this method should return a Future and be performed async Objects.requireNonNull(realm, "realm"); try { - CompletableFuture cf = new CompletableFuture<>(); - ping(HttpMethod.GET, callback) - .onComplete(ar -> cf.complete(ar.succeeded() && ar.result())); + CompletableFuture cf = ping(HttpMethod.GET, callback); if (!cf.get()) { throw new Exception("callback ping failure"); } @@ -360,7 +428,11 @@ private List modifyChildrenWithJvmIds( } catch (Exception e) { logger.info("Update node [{}] with null jvmId", child.getName()); logger.info(e); - nonConnectableTargets.putIfAbsent((TargetNode) child, id); + ConnectionAttemptRecord attemptRecord = new ConnectionAttemptRecord(); + attemptRecord.firstAttemptTimestamp = clock.now().getEpochSecond(); + attemptRecord.lastAttemptTimestamp = attemptRecord.firstAttemptTimestamp; + nonConnectableTargets.putIfAbsent( + Pair.of((TargetNode) child, id), attemptRecord); } modifiedChildren.add(child); } else if (child instanceof EnvironmentNode) { @@ -473,4 +545,11 @@ public static class NotFoundException extends RuntimeException { super(String.format("Unknown registration id: [%s]", id.toString())); } } + + private static class ConnectionAttemptRecord { + static final long MAX_ATTEMPT_INTERVAL = 60; // seconds from first try to last try + long attemptCount; + long firstAttemptTimestamp; + long lastAttemptTimestamp; + } } diff --git a/src/main/java/io/cryostat/net/NetworkModule.java b/src/main/java/io/cryostat/net/NetworkModule.java index e07a03c676..b7954b7641 100644 --- a/src/main/java/io/cryostat/net/NetworkModule.java +++ b/src/main/java/io/cryostat/net/NetworkModule.java @@ -126,6 +126,7 @@ static TargetConnectionManager provideTargetConnectionManager( DiscoveryStorage storage, @Named(Variables.TARGET_CACHE_TTL) Duration maxTargetTtl, @Named(Variables.TARGET_MAX_CONCURRENT_CONNECTIONS) int maxTargetConnections, + @Named(Variables.JMX_CONNECTION_TIMEOUT) long connectionTimeoutSeconds, Logger logger) { return new TargetConnectionManager( connectionToolkit, @@ -135,6 +136,7 @@ static TargetConnectionManager provideTargetConnectionManager( Scheduler.systemScheduler(), maxTargetTtl, maxTargetConnections, + connectionTimeoutSeconds, logger); } diff --git a/src/main/java/io/cryostat/net/TargetConnectionManager.java b/src/main/java/io/cryostat/net/TargetConnectionManager.java index b43cec3aca..2edb0dc089 100644 --- a/src/main/java/io/cryostat/net/TargetConnectionManager.java +++ b/src/main/java/io/cryostat/net/TargetConnectionManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -60,6 +61,7 @@ public class TargetConnectionManager { private final Lazy jfrConnectionToolkit; private final Lazy agentConnectionFactory; private final Executor executor; + private final long connectionTimeoutSeconds; private final Logger logger; private final AsyncLoadingCache connections; @@ -74,10 +76,12 @@ public class TargetConnectionManager { Scheduler scheduler, Duration ttl, int maxTargetConnections, + long connectionTimeoutSeconds, Logger logger) { this.jfrConnectionToolkit = jfrConnectionToolkit; this.agentConnectionFactory = agentConnectionFactory; this.executor = executor; + this.connectionTimeoutSeconds = connectionTimeoutSeconds; this.logger = logger; this.targetLocks = new ConcurrentHashMap<>(); @@ -134,7 +138,8 @@ public CompletableFuture executeConnectedTaskAsync( throw new CompletionException(e); } }, - executor); + executor) + .orTimeout(connectionTimeoutSeconds, TimeUnit.SECONDS); } } @@ -282,14 +287,15 @@ private class ConnectionLoader public CompletableFuture asyncLoad( ConnectionDescriptor key, Executor executor) throws Exception { return CompletableFuture.supplyAsync( - () -> { - try { - return connect(key); - } catch (Exception e) { - throw new CompletionException(e); - } - }, - executor); + () -> { + try { + return connect(key); + } catch (Exception e) { + throw new CompletionException(e); + } + }, + executor) + .orTimeout(connectionTimeoutSeconds, TimeUnit.SECONDS); } @Override diff --git a/src/main/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandler.java b/src/main/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandler.java index 6e6c8f952d..2bfa9fad18 100644 --- a/src/main/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandler.java +++ b/src/main/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandler.java @@ -140,7 +140,7 @@ public IntermediateResponse handle(RequestParameters params) throws ApiExc } private void cleanup(RequestParameters params, Rule rule) { - storage.listUniqueReachableServices().stream() + storage.listDiscoverableServices().stream() .forEach( (ServiceRef ref) -> { vertx.executeBlocking( @@ -155,7 +155,7 @@ private void cleanup(RequestParameters params, Rule rule) { new ConnectionDescriptor( targetId, credentials); recordings.stopRecording( - cd, rule.getRecordingName()); + cd, rule.getRecordingName(), true); } promise.complete(); } catch (Exception e) { diff --git a/src/main/java/io/cryostat/net/web/http/api/v2/RulePatchHandler.java b/src/main/java/io/cryostat/net/web/http/api/v2/RulePatchHandler.java index a3e59a538e..2ee969d820 100644 --- a/src/main/java/io/cryostat/net/web/http/api/v2/RulePatchHandler.java +++ b/src/main/java/io/cryostat/net/web/http/api/v2/RulePatchHandler.java @@ -154,7 +154,7 @@ public IntermediateResponse handle(RequestParameters params) throws ApiExc } private void cleanup(RequestParameters params, Rule rule) { - storage.listUniqueReachableServices().stream() + storage.listDiscoverableServices().stream() .forEach( (ServiceRef ref) -> { vertx.executeBlocking( @@ -169,7 +169,7 @@ private void cleanup(RequestParameters params, Rule rule) { new ConnectionDescriptor( targetId, credentials); recordings.stopRecording( - cd, rule.getRecordingName()); + cd, rule.getRecordingName(), true); } promise.complete(); } catch (Exception e) { diff --git a/src/main/java/io/cryostat/platform/PlatformClient.java b/src/main/java/io/cryostat/platform/PlatformClient.java index a1c9ad2b24..282ab7dd45 100644 --- a/src/main/java/io/cryostat/platform/PlatformClient.java +++ b/src/main/java/io/cryostat/platform/PlatformClient.java @@ -15,9 +15,7 @@ */ package io.cryostat.platform; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.function.Consumer; import io.cryostat.platform.discovery.EnvironmentNode; @@ -35,17 +33,10 @@ default void load(Promise promise) { List listDiscoverableServices(); - default List listUniqueReachableServices() { - Set uniqueIds = new HashSet<>(); - return listDiscoverableServices().stream() - .filter((ref) -> ref.getJvmId() != null && uniqueIds.add(ref.getJvmId())) - .toList(); - } - default boolean contains(ServiceRef ref) { var existingRef = - listUniqueReachableServices().stream() - .filter(sr -> !sr.equals(ref) && sr.getJvmId().equals(ref.getJvmId())) + listDiscoverableServices().stream() + .filter(sr -> sr.equals(ref) || sr.getJvmId().equals(ref.getJvmId())) .findAny(); return existingRef.isPresent(); } diff --git a/src/main/java/io/cryostat/recordings/JvmIdHelper.java b/src/main/java/io/cryostat/recordings/JvmIdHelper.java index 9fa402dc29..9b2357001d 100644 --- a/src/main/java/io/cryostat/recordings/JvmIdHelper.java +++ b/src/main/java/io/cryostat/recordings/JvmIdHelper.java @@ -114,20 +114,18 @@ public ServiceRef resolveId(ServiceRef sr) throws JvmIdGetException { URI serviceUri = sr.getServiceUri(); String uriStr = serviceUri.toString(); try { - CompletableFuture future = - this.targetConnectionManager.executeConnectedTaskAsync( - new ConnectionDescriptor(uriStr, credentialsManager.getCredentials(sr)), - JFRConnection::getJvmId); - future.thenAccept( - id -> { - String prevId = this.ids.synchronous().get(uriStr); - if (Objects.equals(prevId, id)) { - return; - } - this.ids.put(uriStr, CompletableFuture.completedFuture(id)); - logger.info("JVM ID: {} -> {}", uriStr, id); - }); - String id = future.get(connectionTimeoutSeconds, TimeUnit.SECONDS); + String id = + computeJvmId(uriStr, Optional.ofNullable(credentialsManager.getCredentials(sr))) + .whenComplete( + (i, t) -> { + String prevId = this.ids.synchronous().get(uriStr); + if (Objects.equals(prevId, i)) { + return; + } + this.ids.put(uriStr, CompletableFuture.completedFuture(i)); + logger.info("JVM ID: {} -> {}", uriStr, i); + }) + .get(); ServiceRef updated = new ServiceRef(id, serviceUri, sr.getAlias().orElse(uriStr)); updated.setLabels(sr.getLabels()); @@ -135,7 +133,7 @@ public ServiceRef resolveId(ServiceRef sr) throws JvmIdGetException { updated.setCryostatAnnotations(sr.getCryostatAnnotations()); reverse.put(id, sr); return updated; - } catch (InterruptedException | ExecutionException | TimeoutException | ScriptException e) { + } catch (InterruptedException | ExecutionException | ScriptException e) { logger.warn("Could not resolve jvmId for target {}", uriStr); throw new JvmIdGetException(e, uriStr); } diff --git a/src/main/java/io/cryostat/recordings/RecordingMetadataManager.java b/src/main/java/io/cryostat/recordings/RecordingMetadataManager.java index 9d338b9cfb..db88573327 100644 --- a/src/main/java/io/cryostat/recordings/RecordingMetadataManager.java +++ b/src/main/java/io/cryostat/recordings/RecordingMetadataManager.java @@ -412,8 +412,11 @@ public void accept(TargetDiscoveryEvent tde) { // ID and inform us of that occurrence, and use that invalidation // message to clear our stored metadata break; + case MODIFIED: + handleFoundTarget(tde.getServiceRef()); + break; default: - throw new UnsupportedOperationException(tde.getEventKind().toString()); + break; } }); } diff --git a/src/main/java/io/cryostat/rules/PeriodicArchiver.java b/src/main/java/io/cryostat/rules/PeriodicArchiver.java index 859050f048..3401115f01 100644 --- a/src/main/java/io/cryostat/rules/PeriodicArchiver.java +++ b/src/main/java/io/cryostat/rules/PeriodicArchiver.java @@ -43,7 +43,7 @@ class PeriodicArchiver implements Runnable { private final CredentialsManager credentialsManager; private final Rule rule; private final RecordingArchiveHelper recordingArchiveHelper; - private final Function, Void> failureNotifier; + private final Function, Void> failureNotifier; private final Logger logger; private final Queue previousRecordings; @@ -53,7 +53,7 @@ class PeriodicArchiver implements Runnable { CredentialsManager credentialsManager, Rule rule, RecordingArchiveHelper recordingArchiveHelper, - Function, Void> failureNotifier, + Function, Void> failureNotifier, Logger logger) { this.serviceRef = serviceRef; this.credentialsManager = credentialsManager; @@ -105,7 +105,7 @@ public void run() { if (AbstractAuthenticatedRequestHandler.isJmxAuthFailure(e) || AbstractAuthenticatedRequestHandler.isJmxSslFailure(e) || AbstractAuthenticatedRequestHandler.isServiceTypeFailure(e)) { - failureNotifier.apply(Pair.of(serviceRef, rule)); + failureNotifier.apply(Pair.of(serviceRef.getJvmId(), rule)); } } } diff --git a/src/main/java/io/cryostat/rules/PeriodicArchiverFactory.java b/src/main/java/io/cryostat/rules/PeriodicArchiverFactory.java index d9c1bbff10..bed9adf551 100644 --- a/src/main/java/io/cryostat/rules/PeriodicArchiverFactory.java +++ b/src/main/java/io/cryostat/rules/PeriodicArchiverFactory.java @@ -37,7 +37,7 @@ PeriodicArchiver create( CredentialsManager credentialsManager, Rule rule, RecordingArchiveHelper recordingArchiveHelper, - Function, Void> failureNotifier) { + Function, Void> failureNotifier) { return new PeriodicArchiver( serviceRef, credentialsManager, diff --git a/src/main/java/io/cryostat/rules/Rule.java b/src/main/java/io/cryostat/rules/Rule.java index 732b2914b2..e9bc185bef 100644 --- a/src/main/java/io/cryostat/rules/Rule.java +++ b/src/main/java/io/cryostat/rules/Rule.java @@ -185,13 +185,45 @@ private static String validateEventSpecifier(String eventSpecifier) } @Override - public boolean equals(Object o) { - return EqualsBuilder.reflectionEquals(this, o); + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other == this) { + return true; + } + if (!(other instanceof Rule)) { + return false; + } + Rule r = (Rule) other; + // ignore the enabled state + return new EqualsBuilder() + .append(name, r.name) + .append(description, r.description) + .append(matchExpression, r.matchExpression) + .append(eventSpecifier, r.eventSpecifier) + .append(archivalPeriodSeconds, r.archivalPeriodSeconds) + .append(initialDelaySeconds, r.initialDelaySeconds) + .append(preservedArchives, r.preservedArchives) + .append(maxAgeSeconds, r.maxAgeSeconds) + .append(maxSizeBytes, r.maxSizeBytes) + .build(); } @Override public int hashCode() { - return HashCodeBuilder.reflectionHashCode(this); + // ignore the enabled state + return new HashCodeBuilder() + .append(name) + .append(description) + .append(matchExpression) + .append(eventSpecifier) + .append(archivalPeriodSeconds) + .append(initialDelaySeconds) + .append(preservedArchives) + .append(maxAgeSeconds) + .append(maxSizeBytes) + .toHashCode(); } public static class Builder { diff --git a/src/main/java/io/cryostat/rules/RuleProcessor.java b/src/main/java/io/cryostat/rules/RuleProcessor.java index d38fb4cc8e..419050ef6d 100644 --- a/src/main/java/io/cryostat/rules/RuleProcessor.java +++ b/src/main/java/io/cryostat/rules/RuleProcessor.java @@ -16,22 +16,24 @@ package io.cryostat.rules; import java.io.IOException; -import java.time.Duration; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import javax.script.ScriptException; import org.openjdk.jmc.flightrecorder.configuration.recording.RecordingOptionsBuilder; import org.openjdk.jmc.rjmx.services.jfr.IRecordingDescriptor; +import org.openjdk.jmc.rjmx.services.jfr.IRecordingDescriptor.RecordingState; import io.cryostat.configuration.CredentialsManager; import io.cryostat.configuration.CredentialsManager.CredentialsEvent; @@ -54,11 +56,12 @@ import io.cryostat.util.events.EventListener; import io.vertx.core.AbstractVerticle; -import io.vertx.core.Vertx; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; public class RuleProcessor extends AbstractVerticle implements Consumer { + private final ScheduledExecutorService executor; private final PlatformClient platformClient; private final RuleRegistry registry; private final CredentialsManager credentialsManager; @@ -70,10 +73,10 @@ public class RuleProcessor extends AbstractVerticle implements Consumer, Set> tasks; + private final Map, Future> tasks; RuleProcessor( - Vertx vertx, + ScheduledExecutorService executor, PlatformClient platformClient, RuleRegistry registry, CredentialsManager credentialsManager, @@ -84,7 +87,7 @@ public class RuleProcessor extends AbstractVerticle implements Consumer(); + this.tasks = new ConcurrentHashMap<>(); this.registry.addListener(this.ruleListener()); this.credentialsManager.addListener(this.credentialsListener()); @@ -109,35 +112,21 @@ public void start() { @Override public void stop() { this.platformClient.removeTargetDiscoveryListener(this); - this.tasks.forEach((ruleExecution, ids) -> ids.forEach(vertx::cancelTimer)); + this.tasks.forEach((ruleExecution, task) -> task.cancel(false)); this.tasks.clear(); } - public EventListener ruleListener() { + EventListener ruleListener() { return new EventListener() { @Override public void onEvent(Event event) { switch (event.getEventType()) { case ADDED: - vertx.>executeBlocking( - promise -> - promise.complete( - platformClient.listUniqueReachableServices()), - false, - result -> - result.result().stream() - .filter( - serviceRef -> - event.getPayload().isEnabled() - && registry.applies( - event.getPayload(), - serviceRef)) - .forEach( - serviceRef -> - activate( - event.getPayload(), - serviceRef))); + if (!event.getPayload().isEnabled()) { + break; + } + activateRule(event); break; case REMOVED: deactivate(event.getPayload(), null); @@ -146,54 +135,65 @@ public void onEvent(Event event) { if (!event.getPayload().isEnabled()) { deactivate(event.getPayload(), null); } else { - vertx.>executeBlocking( - promise -> - promise.complete( - platformClient.listUniqueReachableServices()), - false, - result -> - result.result().stream() - .filter( - serviceRef -> - registry.applies( - event.getPayload(), - serviceRef)) - .forEach( - serviceRef -> - activate( - event.getPayload(), - serviceRef))); + activateRule(event); } break; default: throw new UnsupportedOperationException(event.getEventType().toString()); } } + + private void activateRule(Event event) { + executor.submit( + () -> { + platformClient.listDiscoverableServices().stream() + .filter( + serviceRef -> + registry.applies( + event.getPayload(), serviceRef)) + .forEach( + serviceRef -> activate(event.getPayload(), serviceRef)); + }); + } }; } - public EventListener credentialsListener() { + EventListener credentialsListener() { return new EventListener() { @Override public void onEvent(Event event) { switch (event.getEventType()) { case ADDED: - credentialsManager - .resolveMatchingTargets(event.getPayload()) - .forEach( - sr -> { - registry.getRules(sr).stream() - .filter(Rule::isEnabled) - .forEach(rule -> activate(rule, sr)); - }); + activateRule(event); break; case REMOVED: + // check if we need to re-trigger on each target even though we are removing + // credentials. This targets the case where there may be multiple + // overlapping credentials applying to a single target. This is generally an + // invalid configuration, but if a credential definition is removed, we can + // fairly cheaply check if another set of credentials may apply and allow us + // to trigger a rule activation. + activateRule(event); break; default: throw new UnsupportedOperationException(event.getEventType().toString()); } } + + private void activateRule(Event event) { + executor.submit( + () -> { + credentialsManager + .resolveMatchingTargets(event.getPayload()) + .forEach( + sr -> { + registry.getRules(sr).stream() + .filter(Rule::isEnabled) + .forEach(rule -> activate(rule, sr)); + }); + }); + } }; } @@ -201,27 +201,36 @@ public void onEvent(Event event) { public synchronized void accept(TargetDiscoveryEvent tde) { switch (tde.getEventKind()) { case FOUND: - if (!platformClient.contains(tde.getServiceRef())) { - registry.getRules(tde.getServiceRef()) - .forEach( - rule -> { - if (rule.isEnabled()) { - activate(rule, tde.getServiceRef()); - } - }); - } + activateAllRulesFor(tde.getServiceRef()); break; case LOST: deactivate(null, tde.getServiceRef()); break; case MODIFIED: + activateAllRulesFor(tde.getServiceRef()); break; default: throw new UnsupportedOperationException(tde.getEventKind().toString()); } } + private void activateAllRulesFor(ServiceRef serviceRef) { + registry.getRules(serviceRef) + .forEach( + rule -> { + if (rule.isEnabled()) { + activate(rule, serviceRef); + } + }); + } + private void activate(Rule rule, ServiceRef serviceRef) { + if (StringUtils.isBlank(serviceRef.getJvmId())) { + this.logger.trace( + "Target {} has no JVM ID, aborting rule activation", + serviceRef.getServiceUri()); + } + Pair key = Pair.of(serviceRef.getJvmId(), rule); if (!rule.isEnabled()) { this.logger.trace( "Activating rule {} for target {} aborted, rule is disabled {} ", @@ -230,7 +239,7 @@ private void activate(Rule rule, ServiceRef serviceRef) { rule.isEnabled()); return; } - if (tasks.containsKey(Pair.of(serviceRef, rule))) { + if (tasks.containsKey(key)) { this.logger.trace( "Activating rule {} for target {} aborted, rule is already active", rule.getName(), @@ -240,71 +249,59 @@ private void activate(Rule rule, ServiceRef serviceRef) { this.logger.trace( "Activating rule {} for target {}", rule.getName(), serviceRef.getServiceUri()); - vertx.executeBlocking( - promise -> { + executor.submit( + () -> { + try { + Credentials credentials = credentialsManager.getCredentials(serviceRef); + if (rule.isArchiver()) { try { - Credentials creds = credentialsManager.getCredentials(serviceRef); - promise.complete(creds); - } catch (ScriptException e) { - promise.fail(e); + archiveRuleRecording( + new ConnectionDescriptor(serviceRef, credentials), rule); + } catch (Exception e) { + logger.error(e); } - }) - .onSuccess(c -> logger.trace("Rule activation successful")) - .onSuccess( - credentials -> { - if (rule.isArchiver()) { - try { - archiveRuleRecording( - new ConnectionDescriptor(serviceRef, credentials), - rule); - } catch (Exception e) { - logger.error(e); - } - } else { - try { - startRuleRecording( - new ConnectionDescriptor(serviceRef, credentials), - rule); - } catch (Exception e) { - logger.error(e); - } - - PeriodicArchiver periodicArchiver = - periodicArchiverFactory.create( - serviceRef, - credentialsManager, - rule, - recordingArchiveHelper, - this::archivalFailureHandler); - Pair key = Pair.of(serviceRef, rule); - Set ids = tasks.computeIfAbsent(key, k -> new HashSet<>()); - int initialDelay = rule.getInitialDelaySeconds(); - int archivalPeriodSeconds = rule.getArchivalPeriodSeconds(); - if (initialDelay <= 0) { - initialDelay = archivalPeriodSeconds; - } - if (rule.getPreservedArchives() <= 0 - || archivalPeriodSeconds <= 0) { + } else { + try { + if (!startRuleRecording( + new ConnectionDescriptor(serviceRef, credentials), rule)) { return; } - long initialTask = - vertx.setTimer( - Duration.ofSeconds(initialDelay).toMillis(), - initialId -> { - tasks.get(key).remove(initialId); - periodicArchiver.run(); - long periodicTask = - vertx.setPeriodic( - Duration.ofSeconds( - archivalPeriodSeconds) - .toMillis(), - periodicId -> - periodicArchiver.run()); - ids.add(periodicTask); - }); - ids.add(initialTask); + } catch (Exception e) { + logger.error(e); + return; } - }); + + if (tasks.containsKey(key)) { + tasks.get(key).cancel(false); + } + + PeriodicArchiver periodicArchiver = + periodicArchiverFactory.create( + serviceRef, + credentialsManager, + rule, + recordingArchiveHelper, + this::archivalFailureHandler); + int initialDelay = rule.getInitialDelaySeconds(); + int archivalPeriodSeconds = rule.getArchivalPeriodSeconds(); + if (initialDelay <= 0) { + initialDelay = archivalPeriodSeconds; + } + if (rule.getPreservedArchives() <= 0 || archivalPeriodSeconds <= 0) { + return; + } + Future task = + executor.scheduleAtFixedRate( + periodicArchiver::run, + initialDelay, + archivalPeriodSeconds, + TimeUnit.SECONDS); + tasks.put(key, task); + } + } catch (ScriptException e) { + logger.error(e); + } + }); } private void deactivate(Rule rule, ServiceRef serviceRef) { @@ -317,25 +314,29 @@ private void deactivate(Rule rule, ServiceRef serviceRef) { if (serviceRef != null) { logger.trace("Deactivating rules for {}", serviceRef.getServiceUri()); } - Iterator, Set>> it = tasks.entrySet().iterator(); + Iterator, Future>> it = tasks.entrySet().iterator(); while (it.hasNext()) { - Map.Entry, Set> entry = it.next(); - boolean sameRule = Objects.equals(entry.getKey().getRight(), rule); - boolean sameTarget = Objects.equals(entry.getKey().getLeft(), serviceRef); + Map.Entry, Future> entry = it.next(); + Pair key = entry.getKey(); + Future task = entry.getValue(); + boolean sameTarget = + serviceRef != null && Objects.equals(key.getLeft(), serviceRef.getJvmId()); + boolean sameRule = Objects.equals(key.getRight(), rule); if (sameRule || sameTarget) { - Set ids = entry.getValue(); - ids.forEach( - (id) -> { - vertx.cancelTimer(id); - logger.trace("Cancelled timer {}", id); - }); - it.remove(); + try { + it.remove(); + task.cancel(false); + } catch (Exception e) { + logger.error(e); + } } } } - private Void archivalFailureHandler(Pair key) { - tasks.get(key).forEach(vertx::cancelTimer); + private Void archivalFailureHandler(Pair key) { + if (tasks.containsKey(key)) { + tasks.get(key).cancel(false); + } tasks.remove(key); return null; } @@ -365,11 +366,20 @@ private void archiveRuleRecording(ConnectionDescriptor connectionDescriptor, Rul } } - private void startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule rule) { + private boolean startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule rule) { CompletableFuture future = targetConnectionManager.executeConnectedTaskAsync( connectionDescriptor, connection -> { + Optional opt = + recordingTargetHelper.getDescriptorByName( + connection, rule.getRecordingName()); + if (opt.isPresent()) { + if (RecordingState.RUNNING.equals(opt.get().getState())) { + return null; + } + } + RecordingOptionsBuilder builder = recordingOptionsBuilderFactory .create(connection.getService()) @@ -384,7 +394,7 @@ private void startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule RecordingTargetHelper.parseEventSpecifierToTemplate( rule.getEventSpecifier()); return recordingTargetHelper.startRecording( - ReplacementPolicy.ALWAYS, + ReplacementPolicy.STOPPED, connectionDescriptor, builder.build(), template.getLeft(), @@ -393,11 +403,14 @@ private void startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule false); }); try { - future.handleAsync( + return future.handleAsync( (recording, throwable) -> { if (throwable != null) { logger.error(new RuleException(throwable)); - return null; + return false; + } + if (recording == null) { + return false; } try { Map labels = @@ -415,11 +428,12 @@ private void startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule } catch (IOException ioe) { logger.error(ioe); } - return null; + return true; }) .get(); } catch (InterruptedException | ExecutionException e) { logger.error(new RuleException(e)); + return false; } } } diff --git a/src/main/java/io/cryostat/rules/RulesModule.java b/src/main/java/io/cryostat/rules/RulesModule.java index 2623143a90..6ba72f8d7a 100644 --- a/src/main/java/io/cryostat/rules/RulesModule.java +++ b/src/main/java/io/cryostat/rules/RulesModule.java @@ -19,6 +19,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.concurrent.Executors; import java.util.function.Function; import javax.inject.Named; @@ -114,7 +115,6 @@ static MatchExpressionEvaluator provideMatchExpressionEvaluator( @Provides @Singleton static RuleProcessor provideRuleProcessor( - Vertx vertx, DiscoveryStorage storage, RuleRegistry registry, CredentialsManager credentialsManager, @@ -126,7 +126,7 @@ static RuleProcessor provideRuleProcessor( PeriodicArchiverFactory periodicArchiverFactory, Logger logger) { return new RuleProcessor( - vertx, + Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2), storage, registry, credentialsManager, diff --git a/src/test/java/io/cryostat/FakeScheduledExecutorService.java b/src/test/java/io/cryostat/FakeScheduledExecutorService.java new file mode 100644 index 0000000000..37d667ab3d --- /dev/null +++ b/src/test/java/io/cryostat/FakeScheduledExecutorService.java @@ -0,0 +1,91 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat; + +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class FakeScheduledExecutorService extends DirectExecutorService + implements ScheduledExecutorService { + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return wrap(submit(command)); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return wrap(submit(callable)); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + return wrap(submit(command)); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit) { + return wrap(submit(command)); + } + + private ScheduledFuture wrap(Future delegate) { + return new ScheduledFuture() { + @Override + public long getDelay(TimeUnit unit) { + return Long.MAX_VALUE; + } + + @Override + public int compareTo(Delayed o) { + return 0; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return delegate.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.get(timeout, unit); + } + }; + } +} diff --git a/src/test/java/io/cryostat/discovery/DiscoveryStorageTest.java b/src/test/java/io/cryostat/discovery/DiscoveryStorageTest.java index 703b9f6c15..d8a6519f74 100644 --- a/src/test/java/io/cryostat/discovery/DiscoveryStorageTest.java +++ b/src/test/java/io/cryostat/discovery/DiscoveryStorageTest.java @@ -29,12 +29,14 @@ import javax.inject.Singleton; import io.cryostat.DirectExecutorService; +import io.cryostat.FakeScheduledExecutorService; import io.cryostat.MainModule; import io.cryostat.MockVertx; import io.cryostat.VerticleDeployer; import io.cryostat.configuration.CredentialsManager; import io.cryostat.core.log.Logger; import io.cryostat.core.net.discovery.JvmDiscoveryClient.EventKind; +import io.cryostat.core.sys.Clock; import io.cryostat.discovery.DiscoveryStorage.NotFoundException; import io.cryostat.platform.ServiceRef; import io.cryostat.platform.TargetDiscoveryEvent; @@ -83,6 +85,7 @@ class DiscoveryStorageTest { @Mock CredentialsManager credentialsManager; @Mock MatchExpressionEvaluator matchExpressionEvaluator; @Mock WebClient http; + @Mock Clock clock; @Mock Logger logger; Vertx vertx = MockVertx.vertx(); Gson gson = MainModule.provideGson(logger); @@ -107,6 +110,7 @@ void setup() { this.storage = new DiscoveryStorage( deployer, + new FakeScheduledExecutorService(), new DirectExecutorService(), Duration.ofMinutes(5), () -> builtin, @@ -116,6 +120,7 @@ void setup() { () -> matchExpressionEvaluator, gson, http, + clock, logger); this.storage.init(vertx, null); } @@ -203,13 +208,11 @@ void removesPluginsIfCallbackRejected() throws Exception { storage.start(p); f.join(); - Mockito.verify(dao).delete(plugin.getId()); + Mockito.verify(dao, Mockito.times(2)).delete(plugin.getId()); } @Test void removesPluginsIfCallbackFails() throws Exception { - Mockito.when(deployer.deploy(Mockito.any(), Mockito.anyBoolean())) - .thenReturn(Future.succeededFuture()); EnvironmentNode realm = new EnvironmentNode("realm", BaseNodeType.REALM, Map.of(), Set.of()); PluginInfo plugin = @@ -232,7 +235,6 @@ void removesPluginsIfCallbackFails() throws Exception { Mockito.when(req.timeout(Mockito.anyLong())).thenReturn(req); Mockito.when(req.followRedirects(Mockito.anyBoolean())).thenReturn(req); - HttpResponse res = Mockito.mock(HttpResponse.class); Future> future = Future.failedFuture("test failure"); Mockito.when(req.send()).thenReturn(future); @@ -242,7 +244,7 @@ void removesPluginsIfCallbackFails() throws Exception { storage.start(p); f.join(); - Mockito.verify(dao).delete(plugin.getId()); + Mockito.verify(dao, Mockito.times(2)).delete(plugin.getId()); } @Test @@ -697,57 +699,6 @@ void listsAllTreeLeaves() { MatcherAssert.assertThat(servicesList, Matchers.hasSize(4)); MatcherAssert.assertThat(servicesList, Matchers.containsInAnyOrder(sr1, sr2, sr3, sr4)); } - - @Test - void listsUniqueNonNullTreeLeaves() throws Exception { - ServiceRef sr1 = - new ServiceRef( - null, URI.create("service:jmx:rmi:///jndi/rmi://leaf:1/jmxrmi"), "sr1"); - TargetNode leaf1 = new TargetNode(BaseNodeType.JVM, sr1); - ServiceRef sr2 = - new ServiceRef( - "same-id", - URI.create("service:jmx:rmi:///jndi/rmi://leaf:2/jmxrmi"), - "sr2"); - TargetNode leaf2 = new TargetNode(BaseNodeType.JVM, sr2); - EnvironmentNode realm1 = - new EnvironmentNode( - DefaultPlatformClient.REALM, - BaseNodeType.REALM, - Map.of(), - Set.of(leaf1, leaf2)); - PluginInfo plugin1 = new PluginInfo(); - plugin1.setSubtree(gson.toJson(realm1)); - - ServiceRef sr3 = - new ServiceRef( - "same-id", - URI.create("service:jmx:rmi:///jndi/rmi://leaf:3/jmxrmi"), - "sr3"); - TargetNode leaf3 = new TargetNode(BaseNodeType.JVM, sr3); - ServiceRef sr4 = - new ServiceRef( - "other-id", - URI.create("service:jmx:rmi:///jndi/rmi://leaf:4/jmxrmi"), - "sr4"); - TargetNode leaf4 = new TargetNode(BaseNodeType.JVM, sr4); - EnvironmentNode realm2 = - new EnvironmentNode( - CustomTargetPlatformClient.REALM, - BaseNodeType.REALM, - Map.of(), - Set.of(leaf3, leaf4)); - PluginInfo plugin2 = new PluginInfo(); - plugin2.setSubtree(gson.toJson(realm2)); - - Mockito.when(dao.getAll()).thenReturn(List.of(plugin1, plugin2)); - - List servicesList = storage.listUniqueReachableServices(); - - MatcherAssert.assertThat(servicesList, Matchers.hasSize(2)); - MatcherAssert.assertThat(servicesList, Matchers.containsInRelativeOrder(sr2, sr4)); - // sr2 over sr3 because RealmOrder of "JDP" has higher priority than "Custom Targets" - } } @Nested diff --git a/src/test/java/io/cryostat/net/TargetConnectionManagerTest.java b/src/test/java/io/cryostat/net/TargetConnectionManagerTest.java index 05beba2d6f..c1171d2397 100644 --- a/src/test/java/io/cryostat/net/TargetConnectionManagerTest.java +++ b/src/test/java/io/cryostat/net/TargetConnectionManagerTest.java @@ -63,6 +63,7 @@ void setup() { Scheduler.disabledScheduler(), TTL, -1, + 10, logger); } @@ -156,6 +157,7 @@ void shouldCreateNewConnectionForAccessDelayedLongerThanTTL() throws Exception { Scheduler.systemScheduler(), Duration.ofNanos(1), 1, + 10, logger); Mockito.when(jfrConnectionToolkit.connect(Mockito.any(), Mockito.any(), Mockito.any())) .thenAnswer( @@ -185,6 +187,7 @@ void shouldCreateNewConnectionPerTarget() throws Exception { Scheduler.disabledScheduler(), Duration.ofNanos(1), -1, + 10, logger); Mockito.when(jfrConnectionToolkit.connect(Mockito.any(), Mockito.any(), Mockito.any())) .thenAnswer( @@ -223,6 +226,7 @@ void shouldConnectToAgents(String url) throws Exception { Scheduler.disabledScheduler(), Duration.ofNanos(1), -1, + 10, logger); ConnectionDescriptor desc = new ConnectionDescriptor(url); JFRConnection conn = mgr.executeConnectedTask(desc, a -> a); diff --git a/src/test/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandlerTest.java b/src/test/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandlerTest.java index 4ea311a12f..72307e892c 100644 --- a/src/test/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandlerTest.java +++ b/src/test/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandlerTest.java @@ -186,7 +186,7 @@ void shouldRespondWith404ForNonexistentRule() throws Exception { Mockito.verify(registry, Mockito.never()).deleteRule(Mockito.anyString()); Mockito.verify(registry, Mockito.never()).applies(Mockito.any(), Mockito.any()); Mockito.verify(recordingTargetHelper, Mockito.never()) - .stopRecording(Mockito.any(), Mockito.any()); + .stopRecording(Mockito.any(), Mockito.any(), Mockito.eq(true)); } @Test @@ -211,11 +211,13 @@ void shouldRespondWith200ForCleanupFailures() throws Exception { "id", new URI("service:jmx:rmi:///jndi/rmi://cryostat:9091/jmxrmi"), "io.cryostat.Cryostat"); - Mockito.when(storage.listUniqueReachableServices()).thenReturn(List.of(serviceRef)); + Mockito.when(storage.listDiscoverableServices()).thenReturn(List.of(serviceRef)); FlightRecorderException exception = new FlightRecorderException(new Exception("test message")); - Mockito.when(recordingTargetHelper.stopRecording(Mockito.any(), Mockito.any())) + Mockito.when( + recordingTargetHelper.stopRecording( + Mockito.any(), Mockito.any(), Mockito.eq(true))) .thenThrow(exception); IntermediateResponse response = handler.handle(params); @@ -225,7 +227,8 @@ void shouldRespondWith200ForCleanupFailures() throws Exception { Mockito.verify(registry).deleteRule(rule); Mockito.verify(registry).applies(rule, serviceRef); Mockito.verify(recordingTargetHelper) - .stopRecording(Mockito.any(), Mockito.eq(rule.getRecordingName())); + .stopRecording( + Mockito.any(), Mockito.eq(rule.getRecordingName()), Mockito.eq(true)); Mockito.verify(logger).error(exception); } @@ -245,7 +248,7 @@ void shouldRespondWith200AfterCleanupNoop() throws Exception { Mockito.when(registry.hasRuleByName(testRuleName)).thenReturn(true); Mockito.when(registry.getRule(testRuleName)).thenReturn(Optional.of(rule)); - Mockito.when(storage.listUniqueReachableServices()).thenReturn(List.of()); + Mockito.when(storage.listDiscoverableServices()).thenReturn(List.of()); IntermediateResponse response = handler.handle(params); MatcherAssert.assertThat(response.getStatusCode(), Matchers.equalTo(200)); @@ -254,7 +257,8 @@ void shouldRespondWith200AfterCleanupNoop() throws Exception { Mockito.verify(registry).deleteRule(rule); Mockito.verify(registry, Mockito.never()).applies(Mockito.any(), Mockito.any()); Mockito.verify(recordingTargetHelper, Mockito.never()) - .stopRecording(Mockito.any(), Mockito.eq(rule.getRecordingName())); + .stopRecording( + Mockito.any(), Mockito.eq(rule.getRecordingName()), Mockito.eq(true)); } @Test @@ -279,7 +283,7 @@ void shouldRespondWith200AfterSuccessfulCleanup() throws Exception { "id", new URI("service:jmx:rmi:///jndi/rmi://cryostat:9091/jmxrmi"), "io.cryostat.Cryostat"); - Mockito.when(storage.listUniqueReachableServices()).thenReturn(List.of(serviceRef)); + Mockito.when(storage.listDiscoverableServices()).thenReturn(List.of(serviceRef)); IntermediateResponse response = handler.handle(params); MatcherAssert.assertThat(response.getStatusCode(), Matchers.equalTo(200)); @@ -288,7 +292,8 @@ void shouldRespondWith200AfterSuccessfulCleanup() throws Exception { Mockito.verify(registry).deleteRule(rule); Mockito.verify(registry).applies(Mockito.any(), Mockito.any()); Mockito.verify(recordingTargetHelper) - .stopRecording(Mockito.any(), Mockito.eq(rule.getRecordingName())); + .stopRecording( + Mockito.any(), Mockito.eq(rule.getRecordingName()), Mockito.eq(true)); } } } diff --git a/src/test/java/io/cryostat/net/web/http/api/v2/RulePatchHandlerTest.java b/src/test/java/io/cryostat/net/web/http/api/v2/RulePatchHandlerTest.java index 240be4ba6f..afb680163b 100644 --- a/src/test/java/io/cryostat/net/web/http/api/v2/RulePatchHandlerTest.java +++ b/src/test/java/io/cryostat/net/web/http/api/v2/RulePatchHandlerTest.java @@ -207,7 +207,7 @@ void shouldDisableRuleAndCleanup() throws Exception { "id", new URI("service:jmx:rmi:///jndi/rmi://cryostat:9091/jmxrmi"), "io.cryostat.Cryostat"); - Mockito.when(storage.listUniqueReachableServices()).thenReturn(List.of(serviceRef)); + Mockito.when(storage.listDiscoverableServices()).thenReturn(List.of(serviceRef)); IntermediateResponse response = handler.handle(params); @@ -226,7 +226,8 @@ void shouldDisableRuleAndCleanup() throws Exception { Mockito.verify(recordingTargetHelper) .stopRecording( Mockito.any(ConnectionDescriptor.class), - Mockito.eq(rule.getRecordingName())); + Mockito.eq(rule.getRecordingName()), + Mockito.eq(true)); MatcherAssert.assertThat(response.getStatusCode(), Matchers.equalTo(204)); } diff --git a/src/test/java/io/cryostat/recordings/RecordingArchiveHelperTest.java b/src/test/java/io/cryostat/recordings/RecordingArchiveHelperTest.java index 492106b4d0..5ce61543e0 100644 --- a/src/test/java/io/cryostat/recordings/RecordingArchiveHelperTest.java +++ b/src/test/java/io/cryostat/recordings/RecordingArchiveHelperTest.java @@ -1165,12 +1165,15 @@ public String answer(InvocationOnMock invocation) throws Throwable { void shouldGetArchivedTimeFromTimestamp() { // December 19, 2019 | 8:38:34 PM UTC String timestamp = "20191219T213834Z"; - Long time = recordingArchiveHelper.getArchivedTimeFromTimestamp(timestamp); + long time = recordingArchiveHelper.getArchivedTimeFromTimestamp(timestamp); long expectedArchivedTime = Instant.parse("2019-12-19T21:38:34.00Z").toEpochMilli(); MatcherAssert.assertThat(time, Matchers.equalTo(expectedArchivedTime)); String invalid = ""; - Long invalidTime = recordingArchiveHelper.getArchivedTimeFromTimestamp(invalid); - MatcherAssert.assertThat(invalidTime, Matchers.equalTo(Instant.now().toEpochMilli())); + long invalidTime = recordingArchiveHelper.getArchivedTimeFromTimestamp(invalid); + long tolerance = 10; + long now = Instant.now().toEpochMilli(); + MatcherAssert.assertThat(invalidTime, Matchers.lessThanOrEqualTo(now + tolerance)); + MatcherAssert.assertThat(invalidTime, Matchers.greaterThanOrEqualTo(now - tolerance)); } } diff --git a/src/test/java/io/cryostat/rules/RuleProcessorTest.java b/src/test/java/io/cryostat/rules/RuleProcessorTest.java index bb47851bac..bcdc5c6a80 100644 --- a/src/test/java/io/cryostat/rules/RuleProcessorTest.java +++ b/src/test/java/io/cryostat/rules/RuleProcessorTest.java @@ -15,12 +15,13 @@ */ package io.cryostat.rules; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; - import java.net.URI; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.openjdk.jmc.common.unit.IConstrainedMap; @@ -28,7 +29,7 @@ import org.openjdk.jmc.rjmx.services.jfr.IFlightRecorderService; import org.openjdk.jmc.rjmx.services.jfr.IRecordingDescriptor; -import io.cryostat.MockVertx; +import io.cryostat.FakeScheduledExecutorService; import io.cryostat.configuration.CredentialsManager; import io.cryostat.configuration.CredentialsManager.CredentialsEvent; import io.cryostat.core.log.Logger; @@ -50,8 +51,6 @@ import io.cryostat.util.events.Event; import io.cryostat.util.events.EventListener; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; import org.apache.commons.lang3.tuple.Pair; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -70,7 +69,7 @@ class RuleProcessorTest { RuleProcessor processor; - Vertx vertx; + FakeScheduledExecutorService executor; @Mock PlatformClient platformClient; @Mock RuleRegistry registry; @Mock CredentialsManager credentialsManager; @@ -87,10 +86,10 @@ class RuleProcessorTest { @BeforeEach void setup() { - this.vertx = MockVertx.vertx(); + this.executor = Mockito.spy(new FakeScheduledExecutorService()); this.processor = new RuleProcessor( - vertx, + executor, platformClient, registry, credentialsManager, @@ -172,6 +171,28 @@ void testSuccessfulRuleActivationWithCredentials() throws Exception { Mockito.when(registry.getRules(serviceRef)).thenReturn(Set.of(rule)); + IRecordingDescriptor autoRule = Mockito.mock(IRecordingDescriptor.class); + + Mockito.when(recordingTargetHelper.getDescriptorByName(Mockito.any(), Mockito.any())) + .thenReturn(Optional.empty()) + .thenReturn(Optional.of(autoRule)); + + Mockito.when( + recordingTargetHelper.startRecording( + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.anyBoolean())) + .thenReturn(autoRule); + + Metadata metadata = new Metadata(Map.of()); + + Mockito.when(metadataManager.getMetadata(Mockito.any(), Mockito.any())) + .thenReturn(metadata); + PeriodicArchiver periodicArchiver = Mockito.mock(PeriodicArchiver.class); Mockito.when( periodicArchiverFactory.create( @@ -217,7 +238,7 @@ void testSuccessfulRuleActivationWithCredentials() throws Exception { archiveOnStopCaptor.capture()); MatcherAssert.assertThat( - replaceCaptor.getValue(), Matchers.equalTo(ReplacementPolicy.ALWAYS)); + replaceCaptor.getValue(), Matchers.equalTo(ReplacementPolicy.STOPPED)); ConnectionDescriptor connectionDescriptor = connectionDescriptorCaptor.getValue(); MatcherAssert.assertThat( @@ -235,16 +256,16 @@ void testSuccessfulRuleActivationWithCredentials() throws Exception { MatcherAssert.assertThat(metadataCaptor.getValue(), Matchers.equalTo(new Metadata())); - ArgumentCaptor> handlerCaptor = ArgumentCaptor.forClass(Handler.class); - Mockito.verify(vertx).setTimer(Mockito.eq(67_000L), handlerCaptor.capture()); + ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Runnable.class); + Mockito.verify(executor) + .scheduleAtFixedRate( + handlerCaptor.capture(), + Mockito.eq((long) rule.getInitialDelaySeconds()), + Mockito.eq((long) rule.getArchivalPeriodSeconds()), + Mockito.eq(TimeUnit.SECONDS)); - Mockito.verify(periodicArchiver, Mockito.times(0)).run(); - handlerCaptor.getValue().handle(1234L); Mockito.verify(periodicArchiver, Mockito.times(1)).run(); - - Mockito.verify(vertx).setPeriodic(Mockito.eq(67_000L), handlerCaptor.capture()); - - handlerCaptor.getValue().handle(1234L); + handlerCaptor.getValue().run(); Mockito.verify(periodicArchiver, Mockito.times(2)).run(); } @@ -334,9 +355,56 @@ void testTaskCancellationOnFailure() throws Exception { .archivalPeriodSeconds(67) .build(); + RecordingOptionsBuilder recordingOptionsBuilder = + Mockito.mock(RecordingOptionsBuilder.class); + Mockito.when(recordingOptionsBuilder.name(Mockito.any())) + .thenReturn(recordingOptionsBuilder); + Mockito.when(recordingOptionsBuilder.toDisk(Mockito.anyBoolean())) + .thenReturn(recordingOptionsBuilder); + Mockito.when(recordingOptionsBuilder.maxAge(Mockito.anyLong())) + .thenReturn(recordingOptionsBuilder); + Mockito.when(recordingOptionsBuilder.maxSize(Mockito.anyLong())) + .thenReturn(recordingOptionsBuilder); + Mockito.when(recordingOptionsBuilderFactory.create(Mockito.any())) + .thenReturn(recordingOptionsBuilder); + IConstrainedMap recordingOptions = Mockito.mock(IConstrainedMap.class); + Mockito.when(recordingOptionsBuilder.build()).thenReturn(recordingOptions); + + Mockito.when( + targetConnectionManager.executeConnectedTaskAsync( + Mockito.any(), Mockito.any())) + .thenAnswer( + arg0 -> + CompletableFuture.completedFuture( + ((TargetConnectionManager.ConnectedTask) + arg0.getArgument(1)) + .execute(connection))); + Mockito.when(registry.getRules(serviceRef)).thenReturn(Set.of(rule)); - PeriodicArchiver periodicArchiver = Mockito.mock(PeriodicArchiver.class); + IRecordingDescriptor autoRule = Mockito.mock(IRecordingDescriptor.class); + + Mockito.when(recordingTargetHelper.getDescriptorByName(Mockito.any(), Mockito.any())) + .thenReturn(Optional.empty()) + .thenReturn(Optional.of(autoRule)); + + Mockito.when( + recordingTargetHelper.startRecording( + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.anyBoolean())) + .thenReturn(autoRule); + + Metadata metadata = new Metadata(Map.of()); + + Mockito.when(metadataManager.getMetadata(Mockito.any(), Mockito.any())) + .thenReturn(metadata); + + PeriodicArchiver[] pa = new PeriodicArchiver[1]; Mockito.when( periodicArchiverFactory.create( Mockito.any(), @@ -344,27 +412,50 @@ void testTaskCancellationOnFailure() throws Exception { Mockito.any(), Mockito.any(), Mockito.any())) - .thenReturn(periodicArchiver); + .thenAnswer( + new Answer() { + @Override + public PeriodicArchiver answer(InvocationOnMock invocation) + throws Throwable { + CredentialsManager cm = invocation.getArgument(1); + Function, Void> fn = invocation.getArgument(4); + PeriodicArchiver p = + new PeriodicArchiver( + serviceRef, + cm, + rule, + recordingArchiveHelper, + fn, + logger); + pa[0] = p; + return p; + } + }); + + ScheduledFuture task = Mockito.mock(ScheduledFuture.class); + Mockito.doReturn(task) + .when(executor) + .scheduleAtFixedRate( + Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any()); processor.accept(tde); - Mockito.verify(vertx).setTimer(Mockito.eq(67_000L), Mockito.any()); + Mockito.verify(executor) + .scheduleAtFixedRate( + Mockito.any(), + Mockito.eq((long) rule.getInitialDelaySeconds()), + Mockito.eq((long) rule.getArchivalPeriodSeconds()), + Mockito.eq(TimeUnit.SECONDS)); - ArgumentCaptor, Void>> functionCaptor = - ArgumentCaptor.forClass(Function.class); Mockito.verify(periodicArchiverFactory) - .create( - Mockito.any(), - Mockito.any(), - Mockito.any(), - Mockito.any(), - functionCaptor.capture()); - Function, Void> failureFunction = functionCaptor.getValue(); - Mockito.verify(vertx, Mockito.never()).cancelTimer(MockVertx.TIMER_ID); + .create(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.verify(task, Mockito.never()).cancel(Mockito.anyBoolean()); - failureFunction.apply(Pair.of(serviceRef, rule)); + Mockito.when(recordingArchiveHelper.getRecordings(Mockito.any())) + .thenReturn(CompletableFuture.failedFuture(new SecurityException())); + pa[0].run(); - Mockito.verify(vertx).cancelTimer(MockVertx.TIMER_ID); + Mockito.verify(task).cancel(false); } @Test @@ -472,7 +563,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { archiveOnStopCaptor.capture()); MatcherAssert.assertThat( - replaceCaptor.getValue(), Matchers.equalTo(ReplacementPolicy.ALWAYS)); + replaceCaptor.getValue(), Matchers.equalTo(ReplacementPolicy.STOPPED)); IConstrainedMap actualRecordingOptions = recordingOptionsCaptor.getValue(); MatcherAssert.assertThat(actualRecordingOptions, Matchers.sameInstance(recordingOptions)); @@ -511,7 +602,7 @@ void testSuccessfulRuleNonActivationWithCredentials() throws Exception { processor.accept(tde); - Mockito.verify(recordingOptionsBuilder, never()).name("auto_Test_Rule"); + Mockito.verify(recordingOptionsBuilder, Mockito.never()).name("auto_Test_Rule"); ArgumentCaptor replaceCaptor = ArgumentCaptor.forClass(ReplacementPolicy.class); @@ -531,7 +622,7 @@ void testSuccessfulRuleNonActivationWithCredentials() throws Exception { ArgumentCaptor archiveOnStopCaptor = ArgumentCaptor.forClass(Boolean.class); - Mockito.verify(recordingTargetHelper, never()) + Mockito.verify(recordingTargetHelper, Mockito.never()) .startRecording( replaceCaptor.capture(), connectionDescriptorCaptor.capture(), diff --git a/src/test/java/itest/AutoRulesCleanupIT.java b/src/test/java/itest/AutoRulesCleanupIT.java index a0a555d7ee..0bb77713b8 100644 --- a/src/test/java/itest/AutoRulesCleanupIT.java +++ b/src/test/java/itest/AutoRulesCleanupIT.java @@ -201,6 +201,8 @@ void testAddRule() throws TimeoutException, InterruptedException, ExecutionExcep @Order(2) void testAddRuleTriggersRecordingCreation() throws TimeoutException, InterruptedException, ExecutionException { + Thread.sleep(3_000); // Wait for rule to trigger + CompletableFuture future = new CompletableFuture<>(); webClient .get(String.format("/api/v1/targets/%s/recordings", jmxServiceUrlEncoded))