From 5f578f93c9ac3bf9dd25f5e7072f24fc7306184d Mon Sep 17 00:00:00 2001
From: Andrew Azores <aazores@redhat.com>
Date: Mon, 18 Sep 2023 14:51:25 -0400
Subject: [PATCH] fix(rules): rule triggering on late-connecting targets
 (#1646)

* tolerate MODIFIED JVM discovery events

* handle target MODIFIED discovery event by re-triggering actions

* rule triggering only replaces STOPPED recordings

* refactoring, parallel execution, handle MODIFIED events that may occur externally

* retry backoff and timeout on nonconnectable targets

* forget about non-connectable targets if they get LOST in the meantime

* only apply timeout/backoff logic to timer-based reconnection attempts, not credentials or discovery driven events

* process rules on dedicated executor pool, not Vertx pool

* perform rule triggering when credentials are added on executor pool

* evaluate rule cleanup against all discovered targets, and be tolerant of already-stopped condition

* attempt rule activation against all targets and handle potential for activation already done on another duplicate target definition

* eliminate bugged method

* use ScheduledExecutorService for background rules processing

* use multithreaded pool

* re-trigger rules on credential removal

* track rule activations using JVM IDs, not whole ServiceRefs

* rule uniqueness should not depend on currently enabled state

* perform discovery background tasks using dedicated scheduler thread and worker pool

* try to add more timeout handling to JVM ID retrieval

* add tolerance to flaky wall-time-checking unit test
---
 .../cryostat/discovery/DiscoveryModule.java   |   4 +
 .../cryostat/discovery/DiscoveryStorage.java  | 283 +++++++++++------
 .../java/io/cryostat/net/NetworkModule.java   |   2 +
 .../cryostat/net/TargetConnectionManager.java |  24 +-
 .../web/http/api/v2/RuleDeleteHandler.java    |   4 +-
 .../net/web/http/api/v2/RulePatchHandler.java |   4 +-
 .../io/cryostat/platform/PlatformClient.java  |  13 +-
 .../io/cryostat/recordings/JvmIdHelper.java   |  28 +-
 .../recordings/RecordingMetadataManager.java  |   5 +-
 .../io/cryostat/rules/PeriodicArchiver.java   |   6 +-
 .../rules/PeriodicArchiverFactory.java        |   2 +-
 src/main/java/io/cryostat/rules/Rule.java     |  38 ++-
 .../java/io/cryostat/rules/RuleProcessor.java | 300 +++++++++---------
 .../java/io/cryostat/rules/RulesModule.java   |   4 +-
 .../FakeScheduledExecutorService.java         |  91 ++++++
 .../discovery/DiscoveryStorageTest.java       |  63 +---
 .../net/TargetConnectionManagerTest.java      |   4 +
 .../http/api/v2/RuleDeleteHandlerTest.java    |  21 +-
 .../web/http/api/v2/RulePatchHandlerTest.java |   5 +-
 .../RecordingArchiveHelperTest.java           |   9 +-
 .../io/cryostat/rules/RuleProcessorTest.java  | 163 +++++++---
 src/test/java/itest/AutoRulesCleanupIT.java   |   2 +
 22 files changed, 676 insertions(+), 399 deletions(-)
 create mode 100644 src/test/java/io/cryostat/FakeScheduledExecutorService.java

diff --git a/src/main/java/io/cryostat/discovery/DiscoveryModule.java b/src/main/java/io/cryostat/discovery/DiscoveryModule.java
index 830ffe1697..dfc6289cf8 100644
--- a/src/main/java/io/cryostat/discovery/DiscoveryModule.java
+++ b/src/main/java/io/cryostat/discovery/DiscoveryModule.java
@@ -27,6 +27,7 @@
 import io.cryostat.configuration.CredentialsManager;
 import io.cryostat.configuration.Variables;
 import io.cryostat.core.log.Logger;
+import io.cryostat.core.sys.Clock;
 import io.cryostat.core.sys.Environment;
 import io.cryostat.messaging.notifications.NotificationFactory;
 import io.cryostat.platform.PlatformModule;
@@ -77,9 +78,11 @@ static DiscoveryStorage provideDiscoveryStorage(
             Lazy<MatchExpressionEvaluator> matchExpressionEvaluator,
             Gson gson,
             WebClient http,
+            Clock clock,
             Logger logger) {
         return new DiscoveryStorage(
                 deployer,
+                Executors.newSingleThreadScheduledExecutor(),
                 Executors.newCachedThreadPool(),
                 pingPeriod,
                 builtin,
@@ -89,6 +92,7 @@ static DiscoveryStorage provideDiscoveryStorage(
                 matchExpressionEvaluator,
                 gson,
                 http,
+                clock,
                 logger);
     }
 
diff --git a/src/main/java/io/cryostat/discovery/DiscoveryStorage.java b/src/main/java/io/cryostat/discovery/DiscoveryStorage.java
index 95a28efee2..a32a6a772c 100644
--- a/src/main/java/io/cryostat/discovery/DiscoveryStorage.java
+++ b/src/main/java/io/cryostat/discovery/DiscoveryStorage.java
@@ -28,7 +28,11 @@
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
 import javax.script.ScriptException;
@@ -38,6 +42,7 @@
 import io.cryostat.configuration.StoredCredentials;
 import io.cryostat.core.log.Logger;
 import io.cryostat.core.net.discovery.JvmDiscoveryClient.EventKind;
+import io.cryostat.core.sys.Clock;
 import io.cryostat.platform.ServiceRef;
 import io.cryostat.platform.ServiceRef.AnnotationKey;
 import io.cryostat.platform.discovery.AbstractNode;
@@ -52,23 +57,22 @@
 import com.google.gson.Gson;
 import com.google.gson.JsonSyntaxException;
 import dagger.Lazy;
-import io.vertx.core.CompositeFuture;
-import io.vertx.core.Future;
 import io.vertx.core.Promise;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.ext.auth.authentication.UsernamePasswordCredentials;
 import io.vertx.ext.web.client.HttpRequest;
-import io.vertx.ext.web.client.HttpResponse;
 import io.vertx.ext.web.client.WebClient;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
 
 public class DiscoveryStorage extends AbstractPlatformClientVerticle {
 
     public static final URI NO_CALLBACK = null;
     private final Duration pingPeriod;
     private final VerticleDeployer deployer;
+    private final ScheduledExecutorService scheduler;
     private final ExecutorService executor;
     private final Lazy<BuiltInDiscovery> builtin;
     private final PluginInfoDao dao;
@@ -76,17 +80,20 @@ public class DiscoveryStorage extends AbstractPlatformClientVerticle {
     private final Lazy<CredentialsManager> credentialsManager;
     private final Lazy<MatchExpressionEvaluator> matchExpressionEvaluator;
     private final Gson gson;
+    private final Clock clock;
     private final WebClient http;
     private final Logger logger;
-    private long pluginPruneTimerId = -1L;
-    private long targetRetryTimeId = -1L;
+    private ScheduledFuture<?> pluginPruneTask;
+    private ScheduledFuture<?> targetRetryTask;
 
-    private final Map<TargetNode, UUID> nonConnectableTargets = new HashMap<>();
+    private final Map<Pair<TargetNode, UUID>, ConnectionAttemptRecord> nonConnectableTargets =
+            new ConcurrentHashMap<>();
 
     public static final String DISCOVERY_STARTUP_ADDRESS = "discovery-startup";
 
     DiscoveryStorage(
             VerticleDeployer deployer,
+            ScheduledExecutorService scheduler,
             ExecutorService executor,
             Duration pingPeriod,
             Lazy<BuiltInDiscovery> builtin,
@@ -96,8 +103,10 @@ public class DiscoveryStorage extends AbstractPlatformClientVerticle {
             Lazy<MatchExpressionEvaluator> matchExpressionEvaluator,
             Gson gson,
             WebClient http,
+            Clock clock,
             Logger logger) {
         this.deployer = deployer;
+        this.scheduler = scheduler;
         this.executor = executor;
         this.pingPeriod = pingPeriod;
         this.builtin = builtin;
@@ -107,55 +116,41 @@ public class DiscoveryStorage extends AbstractPlatformClientVerticle {
         this.matchExpressionEvaluator = matchExpressionEvaluator;
         this.gson = gson;
         this.http = http;
+        this.clock = clock;
         this.logger = logger;
     }
 
     @Override
     public void start(Promise<Void> future) throws Exception {
         pingPrune()
-                .onSuccess(
-                        cf ->
-                                deployer.deploy(builtin.get(), true)
-                                        .onSuccess(ar -> future.complete())
-                                        .onFailure(t -> future.fail((Throwable) t))
-                                        .eventually(
-                                                m ->
-                                                        getVertx()
-                                                                .eventBus()
-                                                                .send(
-                                                                        DISCOVERY_STARTUP_ADDRESS,
-                                                                        "Discovery storage"
-                                                                                + " deployed")))
-                .onFailure(future::fail);
-
-        this.pluginPruneTimerId = getVertx().setPeriodic(pingPeriod.toMillis(), i -> pingPrune());
-        this.targetRetryTimeId =
-                getVertx()
-                        .setPeriodic(
-                                // TODO make this configurable, use an exponential backoff, have a
-                                // maximum retry policy, etc.
-                                15_000,
-                                i -> {
-                                    testNonConnectedTargets(
-                                            entry -> {
-                                                TargetNode targetNode = entry.getKey();
-                                                try {
-                                                    String id =
-                                                            jvmIdHelper
-                                                                    .get()
-                                                                    .resolveId(
-                                                                            targetNode.getTarget())
-                                                                    .getJvmId();
-                                                    return StringUtils.isNotBlank(id);
-                                                } catch (JvmIdGetException e) {
-                                                    logger.info(
-                                                            "Retain null jvmId for node [{}]",
-                                                            targetNode.getName());
-                                                    logger.info(e);
-                                                    return false;
-                                                }
-                                            });
-                                });
+                .whenComplete(
+                        (v, ex) -> {
+                            if (ex != null) {
+                                future.fail(ex);
+                                return;
+                            }
+                            deployer.deploy(builtin.get(), true)
+                                    .onSuccess(ar -> future.complete())
+                                    .onFailure(t -> future.fail((Throwable) t))
+                                    .eventually(
+                                            m ->
+                                                    getVertx()
+                                                            .eventBus()
+                                                            .send(
+                                                                    DISCOVERY_STARTUP_ADDRESS,
+                                                                    "Discovery storage deployed"));
+                        });
+
+        this.pluginPruneTask =
+                scheduler.scheduleAtFixedRate(
+                        this::pingPrune,
+                        pingPeriod.toMillis(),
+                        pingPeriod.toMillis(),
+                        TimeUnit.MILLISECONDS);
+        // TODO make this configurable
+        this.targetRetryTask =
+                scheduler.scheduleAtFixedRate(
+                        this::checkNonConnectedTargetJvmIds, 2, 2, TimeUnit.SECONDS);
         this.credentialsManager
                 .get()
                 .addListener(
@@ -165,11 +160,14 @@ public void start(Promise<Void> future) throws Exception {
                                     testNonConnectedTargets(
                                             entry -> {
                                                 try {
-                                                    return matchExpressionEvaluator
-                                                            .get()
-                                                            .applies(
-                                                                    event.getPayload(),
-                                                                    entry.getKey().getTarget());
+                                                    ServiceRef target = entry.getKey().getTarget();
+                                                    boolean credentialsApply =
+                                                            matchExpressionEvaluator
+                                                                    .get()
+                                                                    .applies(
+                                                                            event.getPayload(),
+                                                                            target);
+                                                    return credentialsApply && testJvmId(target);
                                                 } catch (ScriptException e) {
                                                     logger.error(e);
                                                     return false;
@@ -183,17 +181,79 @@ public void start(Promise<Void> future) throws Exception {
                                             event.getEventType().toString());
                             }
                         });
+
+        this.addTargetDiscoveryListener(
+                tde -> {
+                    switch (tde.getEventKind()) {
+                        case MODIFIED:
+                            testNonConnectedTargets(
+                                    entry ->
+                                            Objects.equals(
+                                                    tde.getServiceRef(),
+                                                    entry.getKey().getTarget()));
+                            break;
+                        case LOST:
+                            var it = nonConnectableTargets.entrySet().iterator();
+                            while (it.hasNext()) {
+                                var entry = it.next();
+                                if (Objects.equals(
+                                        tde.getServiceRef(), entry.getKey().getKey().getTarget())) {
+                                    it.remove();
+                                }
+                            }
+                            break;
+                        default:
+                            break;
+                    }
+                });
+    }
+
+    private void checkNonConnectedTargetJvmIds() {
+        testNonConnectedTargets(
+                entry -> {
+                    TargetNode targetNode = entry.getKey();
+                    ConnectionAttemptRecord attemptRecord = nonConnectableTargets.get(entry);
+                    // TODO make this configurable, use an exponential backoff, have a
+                    // maximum retry policy, etc.
+                    long nextAttempt =
+                            (attemptRecord.attemptCount * attemptRecord.attemptCount)
+                                    + attemptRecord.lastAttemptTimestamp;
+                    attemptRecord.attemptCount++;
+                    long now = clock.now().getEpochSecond();
+                    if (now < nextAttempt) {
+                        return false;
+                    }
+                    long elapsed =
+                            attemptRecord.lastAttemptTimestamp
+                                    - attemptRecord.firstAttemptTimestamp;
+                    if (elapsed > ConnectionAttemptRecord.MAX_ATTEMPT_INTERVAL) {
+                        return false;
+                    }
+                    return testJvmId(targetNode.getTarget());
+                });
+    }
+
+    private boolean testJvmId(ServiceRef serviceRef) {
+        try {
+            String id = jvmIdHelper.get().resolveId(serviceRef).getJvmId();
+            return StringUtils.isNotBlank(id);
+        } catch (JvmIdGetException e) {
+            logger.trace("Retain null jvmId for target [{}]", serviceRef.getServiceUri());
+            logger.trace(e);
+            return false;
+        }
     }
 
     private void testNonConnectedTargets(Predicate<Entry<TargetNode, UUID>> predicate) {
-        executor.execute(
-                () -> {
-                    Map<TargetNode, UUID> copy = new HashMap<>(nonConnectableTargets);
-                    for (var entry : copy.entrySet()) {
+        Map<Pair<TargetNode, UUID>, ConnectionAttemptRecord> copy =
+                new HashMap<>(nonConnectableTargets);
+        for (var entry : copy.entrySet()) {
+            executor.submit(
+                    () -> {
                         try {
-                            if (predicate.test(entry)) {
+                            if (predicate.test(entry.getKey())) {
                                 nonConnectableTargets.remove(entry.getKey());
-                                UUID id = entry.getValue();
+                                UUID id = entry.getKey().getValue();
                                 PluginInfo plugin = getById(id).orElseThrow();
                                 EnvironmentNode original =
                                         gson.fromJson(plugin.getSubtree(), EnvironmentNode.class);
@@ -202,39 +262,42 @@ private void testNonConnectedTargets(Predicate<Entry<TargetNode, UUID>> predicat
                         } catch (JsonSyntaxException e) {
                             throw new RuntimeException(e);
                         }
-                    }
-                });
+                    });
+        }
     }
 
     @Override
     public void stop() {
-        getVertx().cancelTimer(pluginPruneTimerId);
-        getVertx().cancelTimer(targetRetryTimeId);
+        if (this.pluginPruneTask != null) {
+            this.pluginPruneTask.cancel(false);
+        }
+        if (this.targetRetryTask != null) {
+            this.targetRetryTask.cancel(false);
+        }
     }
 
-    private CompositeFuture pingPrune() {
-        List<Future> futures =
+    private CompletableFuture<?> pingPrune() {
+        List<CompletableFuture<Boolean>> futures =
                 dao.getAll().stream()
                         .map(
                                 plugin -> {
                                     UUID key = plugin.getId();
                                     URI uri = plugin.getCallback();
-                                    return (Future)
-                                            ping(HttpMethod.POST, uri)
-                                                    .onSuccess(
-                                                            res -> {
-                                                                if (!Boolean.TRUE.equals(res)) {
-                                                                    removePlugin(key, uri);
-                                                                }
-                                                            });
+                                    return ping(HttpMethod.POST, uri)
+                                            .whenComplete(
+                                                    (v, t) -> {
+                                                        if (t != null || !Boolean.TRUE.equals(v)) {
+                                                            removePlugin(key, uri);
+                                                        }
+                                                    });
                                 })
                         .toList();
-        return CompositeFuture.join(futures);
+        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
     }
 
-    private Future<Boolean> ping(HttpMethod mtd, URI uri) {
+    private CompletableFuture<Boolean> ping(HttpMethod mtd, URI uri) {
         if (Objects.equals(uri, NO_CALLBACK)) {
-            return Future.succeededFuture(true);
+            return CompletableFuture.completedFuture(true);
         }
         HttpRequest<Buffer> req =
                 http.request(mtd, uri.getPort(), uri.getHost(), uri.getPath())
@@ -253,27 +316,34 @@ private Future<Boolean> ping(HttpMethod mtd, URI uri) {
                                     credentials.getCredentials().getUsername(),
                                     credentials.getCredentials().getPassword()));
         }
-        return req.send()
-                .onComplete(
-                        ar -> {
-                            if (ar.failed()) {
-                                logger.info(
-                                        "{} {} failed: {}",
-                                        mtd,
-                                        uri,
-                                        ExceptionUtils.getStackTrace(ar.cause()));
-                                return;
-                            }
-                            logger.info(
-                                    "{} {} status {}: {}",
-                                    mtd,
-                                    uri,
-                                    ar.result().statusCode(),
-                                    ar.result().statusMessage());
-                        })
-                .map(HttpResponse::statusCode)
-                .map(HttpStatusCodeIdentifier::isSuccessCode)
-                .otherwise(false);
+        final HttpRequest<Buffer> freq = req;
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
+        executor.submit(
+                () -> {
+                    freq.send()
+                            .onComplete(
+                                    ar -> {
+                                        if (ar.failed()) {
+                                            logger.info(
+                                                    "{} {} failed: {}",
+                                                    mtd,
+                                                    uri,
+                                                    ExceptionUtils.getStackTrace(ar.cause()));
+                                            result.completeExceptionally(ar.cause());
+                                            return;
+                                        }
+                                        logger.info(
+                                                "{} {} status {}: {}",
+                                                mtd,
+                                                uri,
+                                                ar.result().statusCode(),
+                                                ar.result().statusMessage());
+                                        result.complete(
+                                                HttpStatusCodeIdentifier.isSuccessCode(
+                                                        ar.result().statusCode()));
+                                    });
+                });
+        return result;
     }
 
     private Optional<StoredCredentials> getStoredCredentials(URI uri) {
@@ -312,9 +382,7 @@ public UUID register(String realm, URI callback) throws RegistrationException {
         // FIXME this method should return a Future and be performed async
         Objects.requireNonNull(realm, "realm");
         try {
-            CompletableFuture<Boolean> cf = new CompletableFuture<>();
-            ping(HttpMethod.GET, callback)
-                    .onComplete(ar -> cf.complete(ar.succeeded() && ar.result()));
+            CompletableFuture<Boolean> cf = ping(HttpMethod.GET, callback);
             if (!cf.get()) {
                 throw new Exception("callback ping failure");
             }
@@ -360,7 +428,11 @@ private List<AbstractNode> modifyChildrenWithJvmIds(
                 } catch (Exception e) {
                     logger.info("Update node [{}] with null jvmId", child.getName());
                     logger.info(e);
-                    nonConnectableTargets.putIfAbsent((TargetNode) child, id);
+                    ConnectionAttemptRecord attemptRecord = new ConnectionAttemptRecord();
+                    attemptRecord.firstAttemptTimestamp = clock.now().getEpochSecond();
+                    attemptRecord.lastAttemptTimestamp = attemptRecord.firstAttemptTimestamp;
+                    nonConnectableTargets.putIfAbsent(
+                            Pair.of((TargetNode) child, id), attemptRecord);
                 }
                 modifiedChildren.add(child);
             } else if (child instanceof EnvironmentNode) {
@@ -473,4 +545,11 @@ public static class NotFoundException extends RuntimeException {
             super(String.format("Unknown registration id: [%s]", id.toString()));
         }
     }
+
+    private static class ConnectionAttemptRecord {
+        static final long MAX_ATTEMPT_INTERVAL = 60; // seconds from first try to last try
+        long attemptCount;
+        long firstAttemptTimestamp;
+        long lastAttemptTimestamp;
+    }
 }
diff --git a/src/main/java/io/cryostat/net/NetworkModule.java b/src/main/java/io/cryostat/net/NetworkModule.java
index e07a03c676..b7954b7641 100644
--- a/src/main/java/io/cryostat/net/NetworkModule.java
+++ b/src/main/java/io/cryostat/net/NetworkModule.java
@@ -126,6 +126,7 @@ static TargetConnectionManager provideTargetConnectionManager(
             DiscoveryStorage storage,
             @Named(Variables.TARGET_CACHE_TTL) Duration maxTargetTtl,
             @Named(Variables.TARGET_MAX_CONCURRENT_CONNECTIONS) int maxTargetConnections,
+            @Named(Variables.JMX_CONNECTION_TIMEOUT) long connectionTimeoutSeconds,
             Logger logger) {
         return new TargetConnectionManager(
                 connectionToolkit,
@@ -135,6 +136,7 @@ static TargetConnectionManager provideTargetConnectionManager(
                 Scheduler.systemScheduler(),
                 maxTargetTtl,
                 maxTargetConnections,
+                connectionTimeoutSeconds,
                 logger);
     }
 
diff --git a/src/main/java/io/cryostat/net/TargetConnectionManager.java b/src/main/java/io/cryostat/net/TargetConnectionManager.java
index b43cec3aca..2edb0dc089 100644
--- a/src/main/java/io/cryostat/net/TargetConnectionManager.java
+++ b/src/main/java/io/cryostat/net/TargetConnectionManager.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -60,6 +61,7 @@ public class TargetConnectionManager {
     private final Lazy<JFRConnectionToolkit> jfrConnectionToolkit;
     private final Lazy<AgentConnection.Factory> agentConnectionFactory;
     private final Executor executor;
+    private final long connectionTimeoutSeconds;
     private final Logger logger;
 
     private final AsyncLoadingCache<ConnectionDescriptor, JFRConnection> connections;
@@ -74,10 +76,12 @@ public class TargetConnectionManager {
             Scheduler scheduler,
             Duration ttl,
             int maxTargetConnections,
+            long connectionTimeoutSeconds,
             Logger logger) {
         this.jfrConnectionToolkit = jfrConnectionToolkit;
         this.agentConnectionFactory = agentConnectionFactory;
         this.executor = executor;
+        this.connectionTimeoutSeconds = connectionTimeoutSeconds;
         this.logger = logger;
 
         this.targetLocks = new ConcurrentHashMap<>();
@@ -134,7 +138,8 @@ public <T> CompletableFuture<T> executeConnectedTaskAsync(
                                     throw new CompletionException(e);
                                 }
                             },
-                            executor);
+                            executor)
+                    .orTimeout(connectionTimeoutSeconds, TimeUnit.SECONDS);
         }
     }
 
@@ -282,14 +287,15 @@ private class ConnectionLoader
         public CompletableFuture<JFRConnection> asyncLoad(
                 ConnectionDescriptor key, Executor executor) throws Exception {
             return CompletableFuture.supplyAsync(
-                    () -> {
-                        try {
-                            return connect(key);
-                        } catch (Exception e) {
-                            throw new CompletionException(e);
-                        }
-                    },
-                    executor);
+                            () -> {
+                                try {
+                                    return connect(key);
+                                } catch (Exception e) {
+                                    throw new CompletionException(e);
+                                }
+                            },
+                            executor)
+                    .orTimeout(connectionTimeoutSeconds, TimeUnit.SECONDS);
         }
 
         @Override
diff --git a/src/main/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandler.java b/src/main/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandler.java
index 6e6c8f952d..2bfa9fad18 100644
--- a/src/main/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandler.java
+++ b/src/main/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandler.java
@@ -140,7 +140,7 @@ public IntermediateResponse<Void> handle(RequestParameters params) throws ApiExc
     }
 
     private void cleanup(RequestParameters params, Rule rule) {
-        storage.listUniqueReachableServices().stream()
+        storage.listDiscoverableServices().stream()
                 .forEach(
                         (ServiceRef ref) -> {
                             vertx.executeBlocking(
@@ -155,7 +155,7 @@ private void cleanup(RequestParameters params, Rule rule) {
                                                         new ConnectionDescriptor(
                                                                 targetId, credentials);
                                                 recordings.stopRecording(
-                                                        cd, rule.getRecordingName());
+                                                        cd, rule.getRecordingName(), true);
                                             }
                                             promise.complete();
                                         } catch (Exception e) {
diff --git a/src/main/java/io/cryostat/net/web/http/api/v2/RulePatchHandler.java b/src/main/java/io/cryostat/net/web/http/api/v2/RulePatchHandler.java
index a3e59a538e..2ee969d820 100644
--- a/src/main/java/io/cryostat/net/web/http/api/v2/RulePatchHandler.java
+++ b/src/main/java/io/cryostat/net/web/http/api/v2/RulePatchHandler.java
@@ -154,7 +154,7 @@ public IntermediateResponse<Void> handle(RequestParameters params) throws ApiExc
     }
 
     private void cleanup(RequestParameters params, Rule rule) {
-        storage.listUniqueReachableServices().stream()
+        storage.listDiscoverableServices().stream()
                 .forEach(
                         (ServiceRef ref) -> {
                             vertx.executeBlocking(
@@ -169,7 +169,7 @@ private void cleanup(RequestParameters params, Rule rule) {
                                                         new ConnectionDescriptor(
                                                                 targetId, credentials);
                                                 recordings.stopRecording(
-                                                        cd, rule.getRecordingName());
+                                                        cd, rule.getRecordingName(), true);
                                             }
                                             promise.complete();
                                         } catch (Exception e) {
diff --git a/src/main/java/io/cryostat/platform/PlatformClient.java b/src/main/java/io/cryostat/platform/PlatformClient.java
index a1c9ad2b24..282ab7dd45 100644
--- a/src/main/java/io/cryostat/platform/PlatformClient.java
+++ b/src/main/java/io/cryostat/platform/PlatformClient.java
@@ -15,9 +15,7 @@
  */
 package io.cryostat.platform;
 
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.function.Consumer;
 
 import io.cryostat.platform.discovery.EnvironmentNode;
@@ -35,17 +33,10 @@ default void load(Promise<EnvironmentNode> promise) {
 
     List<ServiceRef> listDiscoverableServices();
 
-    default List<ServiceRef> listUniqueReachableServices() {
-        Set<String> uniqueIds = new HashSet<>();
-        return listDiscoverableServices().stream()
-                .filter((ref) -> ref.getJvmId() != null && uniqueIds.add(ref.getJvmId()))
-                .toList();
-    }
-
     default boolean contains(ServiceRef ref) {
         var existingRef =
-                listUniqueReachableServices().stream()
-                        .filter(sr -> !sr.equals(ref) && sr.getJvmId().equals(ref.getJvmId()))
+                listDiscoverableServices().stream()
+                        .filter(sr -> sr.equals(ref) || sr.getJvmId().equals(ref.getJvmId()))
                         .findAny();
         return existingRef.isPresent();
     }
diff --git a/src/main/java/io/cryostat/recordings/JvmIdHelper.java b/src/main/java/io/cryostat/recordings/JvmIdHelper.java
index 9fa402dc29..9b2357001d 100644
--- a/src/main/java/io/cryostat/recordings/JvmIdHelper.java
+++ b/src/main/java/io/cryostat/recordings/JvmIdHelper.java
@@ -114,20 +114,18 @@ public ServiceRef resolveId(ServiceRef sr) throws JvmIdGetException {
         URI serviceUri = sr.getServiceUri();
         String uriStr = serviceUri.toString();
         try {
-            CompletableFuture<String> future =
-                    this.targetConnectionManager.executeConnectedTaskAsync(
-                            new ConnectionDescriptor(uriStr, credentialsManager.getCredentials(sr)),
-                            JFRConnection::getJvmId);
-            future.thenAccept(
-                    id -> {
-                        String prevId = this.ids.synchronous().get(uriStr);
-                        if (Objects.equals(prevId, id)) {
-                            return;
-                        }
-                        this.ids.put(uriStr, CompletableFuture.completedFuture(id));
-                        logger.info("JVM ID: {} -> {}", uriStr, id);
-                    });
-            String id = future.get(connectionTimeoutSeconds, TimeUnit.SECONDS);
+            String id =
+                    computeJvmId(uriStr, Optional.ofNullable(credentialsManager.getCredentials(sr)))
+                            .whenComplete(
+                                    (i, t) -> {
+                                        String prevId = this.ids.synchronous().get(uriStr);
+                                        if (Objects.equals(prevId, i)) {
+                                            return;
+                                        }
+                                        this.ids.put(uriStr, CompletableFuture.completedFuture(i));
+                                        logger.info("JVM ID: {} -> {}", uriStr, i);
+                                    })
+                            .get();
 
             ServiceRef updated = new ServiceRef(id, serviceUri, sr.getAlias().orElse(uriStr));
             updated.setLabels(sr.getLabels());
@@ -135,7 +133,7 @@ public ServiceRef resolveId(ServiceRef sr) throws JvmIdGetException {
             updated.setCryostatAnnotations(sr.getCryostatAnnotations());
             reverse.put(id, sr);
             return updated;
-        } catch (InterruptedException | ExecutionException | TimeoutException | ScriptException e) {
+        } catch (InterruptedException | ExecutionException | ScriptException e) {
             logger.warn("Could not resolve jvmId for target {}", uriStr);
             throw new JvmIdGetException(e, uriStr);
         }
diff --git a/src/main/java/io/cryostat/recordings/RecordingMetadataManager.java b/src/main/java/io/cryostat/recordings/RecordingMetadataManager.java
index 9d338b9cfb..db88573327 100644
--- a/src/main/java/io/cryostat/recordings/RecordingMetadataManager.java
+++ b/src/main/java/io/cryostat/recordings/RecordingMetadataManager.java
@@ -412,8 +412,11 @@ public void accept(TargetDiscoveryEvent tde) {
                             // ID and inform us of that occurrence, and use that invalidation
                             // message to clear our stored metadata
                             break;
+                        case MODIFIED:
+                            handleFoundTarget(tde.getServiceRef());
+                            break;
                         default:
-                            throw new UnsupportedOperationException(tde.getEventKind().toString());
+                            break;
                     }
                 });
     }
diff --git a/src/main/java/io/cryostat/rules/PeriodicArchiver.java b/src/main/java/io/cryostat/rules/PeriodicArchiver.java
index 859050f048..3401115f01 100644
--- a/src/main/java/io/cryostat/rules/PeriodicArchiver.java
+++ b/src/main/java/io/cryostat/rules/PeriodicArchiver.java
@@ -43,7 +43,7 @@ class PeriodicArchiver implements Runnable {
     private final CredentialsManager credentialsManager;
     private final Rule rule;
     private final RecordingArchiveHelper recordingArchiveHelper;
-    private final Function<Pair<ServiceRef, Rule>, Void> failureNotifier;
+    private final Function<Pair<String, Rule>, Void> failureNotifier;
     private final Logger logger;
 
     private final Queue<String> previousRecordings;
@@ -53,7 +53,7 @@ class PeriodicArchiver implements Runnable {
             CredentialsManager credentialsManager,
             Rule rule,
             RecordingArchiveHelper recordingArchiveHelper,
-            Function<Pair<ServiceRef, Rule>, Void> failureNotifier,
+            Function<Pair<String, Rule>, Void> failureNotifier,
             Logger logger) {
         this.serviceRef = serviceRef;
         this.credentialsManager = credentialsManager;
@@ -105,7 +105,7 @@ public void run() {
             if (AbstractAuthenticatedRequestHandler.isJmxAuthFailure(e)
                     || AbstractAuthenticatedRequestHandler.isJmxSslFailure(e)
                     || AbstractAuthenticatedRequestHandler.isServiceTypeFailure(e)) {
-                failureNotifier.apply(Pair.of(serviceRef, rule));
+                failureNotifier.apply(Pair.of(serviceRef.getJvmId(), rule));
             }
         }
     }
diff --git a/src/main/java/io/cryostat/rules/PeriodicArchiverFactory.java b/src/main/java/io/cryostat/rules/PeriodicArchiverFactory.java
index d9c1bbff10..bed9adf551 100644
--- a/src/main/java/io/cryostat/rules/PeriodicArchiverFactory.java
+++ b/src/main/java/io/cryostat/rules/PeriodicArchiverFactory.java
@@ -37,7 +37,7 @@ PeriodicArchiver create(
             CredentialsManager credentialsManager,
             Rule rule,
             RecordingArchiveHelper recordingArchiveHelper,
-            Function<Pair<ServiceRef, Rule>, Void> failureNotifier) {
+            Function<Pair<String, Rule>, Void> failureNotifier) {
         return new PeriodicArchiver(
                 serviceRef,
                 credentialsManager,
diff --git a/src/main/java/io/cryostat/rules/Rule.java b/src/main/java/io/cryostat/rules/Rule.java
index 732b2914b2..e9bc185bef 100644
--- a/src/main/java/io/cryostat/rules/Rule.java
+++ b/src/main/java/io/cryostat/rules/Rule.java
@@ -185,13 +185,45 @@ private static String validateEventSpecifier(String eventSpecifier)
     }
 
     @Override
-    public boolean equals(Object o) {
-        return EqualsBuilder.reflectionEquals(this, o);
+    public boolean equals(Object other) {
+        if (other == null) {
+            return false;
+        }
+        if (other == this) {
+            return true;
+        }
+        if (!(other instanceof Rule)) {
+            return false;
+        }
+        Rule r = (Rule) other;
+        // ignore the enabled state
+        return new EqualsBuilder()
+                .append(name, r.name)
+                .append(description, r.description)
+                .append(matchExpression, r.matchExpression)
+                .append(eventSpecifier, r.eventSpecifier)
+                .append(archivalPeriodSeconds, r.archivalPeriodSeconds)
+                .append(initialDelaySeconds, r.initialDelaySeconds)
+                .append(preservedArchives, r.preservedArchives)
+                .append(maxAgeSeconds, r.maxAgeSeconds)
+                .append(maxSizeBytes, r.maxSizeBytes)
+                .build();
     }
 
     @Override
     public int hashCode() {
-        return HashCodeBuilder.reflectionHashCode(this);
+        // ignore the enabled state
+        return new HashCodeBuilder()
+                .append(name)
+                .append(description)
+                .append(matchExpression)
+                .append(eventSpecifier)
+                .append(archivalPeriodSeconds)
+                .append(initialDelaySeconds)
+                .append(preservedArchives)
+                .append(maxAgeSeconds)
+                .append(maxSizeBytes)
+                .toHashCode();
     }
 
     public static class Builder {
diff --git a/src/main/java/io/cryostat/rules/RuleProcessor.java b/src/main/java/io/cryostat/rules/RuleProcessor.java
index d38fb4cc8e..419050ef6d 100644
--- a/src/main/java/io/cryostat/rules/RuleProcessor.java
+++ b/src/main/java/io/cryostat/rules/RuleProcessor.java
@@ -16,22 +16,24 @@
 package io.cryostat.rules;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import javax.script.ScriptException;
 
 import org.openjdk.jmc.flightrecorder.configuration.recording.RecordingOptionsBuilder;
 import org.openjdk.jmc.rjmx.services.jfr.IRecordingDescriptor;
+import org.openjdk.jmc.rjmx.services.jfr.IRecordingDescriptor.RecordingState;
 
 import io.cryostat.configuration.CredentialsManager;
 import io.cryostat.configuration.CredentialsManager.CredentialsEvent;
@@ -54,11 +56,12 @@
 import io.cryostat.util.events.EventListener;
 
 import io.vertx.core.AbstractVerticle;
-import io.vertx.core.Vertx;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 
 public class RuleProcessor extends AbstractVerticle implements Consumer<TargetDiscoveryEvent> {
 
+    private final ScheduledExecutorService executor;
     private final PlatformClient platformClient;
     private final RuleRegistry registry;
     private final CredentialsManager credentialsManager;
@@ -70,10 +73,10 @@ public class RuleProcessor extends AbstractVerticle implements Consumer<TargetDi
     private final PeriodicArchiverFactory periodicArchiverFactory;
     private final Logger logger;
 
-    private final Map<Pair<ServiceRef, Rule>, Set<Long>> tasks;
+    private final Map<Pair<String, Rule>, Future<?>> tasks;
 
     RuleProcessor(
-            Vertx vertx,
+            ScheduledExecutorService executor,
             PlatformClient platformClient,
             RuleRegistry registry,
             CredentialsManager credentialsManager,
@@ -84,7 +87,7 @@ public class RuleProcessor extends AbstractVerticle implements Consumer<TargetDi
             RecordingMetadataManager metadataManager,
             PeriodicArchiverFactory periodicArchiverFactory,
             Logger logger) {
-        this.vertx = vertx;
+        this.executor = executor;
         this.platformClient = platformClient;
         this.registry = registry;
         this.credentialsManager = credentialsManager;
@@ -95,7 +98,7 @@ public class RuleProcessor extends AbstractVerticle implements Consumer<TargetDi
         this.metadataManager = metadataManager;
         this.periodicArchiverFactory = periodicArchiverFactory;
         this.logger = logger;
-        this.tasks = new HashMap<>();
+        this.tasks = new ConcurrentHashMap<>();
 
         this.registry.addListener(this.ruleListener());
         this.credentialsManager.addListener(this.credentialsListener());
@@ -109,35 +112,21 @@ public void start() {
     @Override
     public void stop() {
         this.platformClient.removeTargetDiscoveryListener(this);
-        this.tasks.forEach((ruleExecution, ids) -> ids.forEach(vertx::cancelTimer));
+        this.tasks.forEach((ruleExecution, task) -> task.cancel(false));
         this.tasks.clear();
     }
 
-    public EventListener<RuleRegistry.RuleEvent, Rule> ruleListener() {
+    EventListener<RuleRegistry.RuleEvent, Rule> ruleListener() {
         return new EventListener<RuleRegistry.RuleEvent, Rule>() {
 
             @Override
             public void onEvent(Event<RuleEvent, Rule> event) {
                 switch (event.getEventType()) {
                     case ADDED:
-                        vertx.<List<ServiceRef>>executeBlocking(
-                                promise ->
-                                        promise.complete(
-                                                platformClient.listUniqueReachableServices()),
-                                false,
-                                result ->
-                                        result.result().stream()
-                                                .filter(
-                                                        serviceRef ->
-                                                                event.getPayload().isEnabled()
-                                                                        && registry.applies(
-                                                                                event.getPayload(),
-                                                                                serviceRef))
-                                                .forEach(
-                                                        serviceRef ->
-                                                                activate(
-                                                                        event.getPayload(),
-                                                                        serviceRef)));
+                        if (!event.getPayload().isEnabled()) {
+                            break;
+                        }
+                        activateRule(event);
                         break;
                     case REMOVED:
                         deactivate(event.getPayload(), null);
@@ -146,54 +135,65 @@ public void onEvent(Event<RuleEvent, Rule> event) {
                         if (!event.getPayload().isEnabled()) {
                             deactivate(event.getPayload(), null);
                         } else {
-                            vertx.<List<ServiceRef>>executeBlocking(
-                                    promise ->
-                                            promise.complete(
-                                                    platformClient.listUniqueReachableServices()),
-                                    false,
-                                    result ->
-                                            result.result().stream()
-                                                    .filter(
-                                                            serviceRef ->
-                                                                    registry.applies(
-                                                                            event.getPayload(),
-                                                                            serviceRef))
-                                                    .forEach(
-                                                            serviceRef ->
-                                                                    activate(
-                                                                            event.getPayload(),
-                                                                            serviceRef)));
+                            activateRule(event);
                         }
                         break;
                     default:
                         throw new UnsupportedOperationException(event.getEventType().toString());
                 }
             }
+
+            private void activateRule(Event<RuleEvent, Rule> event) {
+                executor.submit(
+                        () -> {
+                            platformClient.listDiscoverableServices().stream()
+                                    .filter(
+                                            serviceRef ->
+                                                    registry.applies(
+                                                            event.getPayload(), serviceRef))
+                                    .forEach(
+                                            serviceRef -> activate(event.getPayload(), serviceRef));
+                        });
+            }
         };
     }
 
-    public EventListener<CredentialsManager.CredentialsEvent, String> credentialsListener() {
+    EventListener<CredentialsManager.CredentialsEvent, String> credentialsListener() {
         return new EventListener<CredentialsManager.CredentialsEvent, String>() {
 
             @Override
             public void onEvent(Event<CredentialsEvent, String> event) {
                 switch (event.getEventType()) {
                     case ADDED:
-                        credentialsManager
-                                .resolveMatchingTargets(event.getPayload())
-                                .forEach(
-                                        sr -> {
-                                            registry.getRules(sr).stream()
-                                                    .filter(Rule::isEnabled)
-                                                    .forEach(rule -> activate(rule, sr));
-                                        });
+                        activateRule(event);
                         break;
                     case REMOVED:
+                        // check if we need to re-trigger on each target even though we are removing
+                        // credentials. This targets the case where there may be multiple
+                        // overlapping credentials applying to a single target. This is generally an
+                        // invalid configuration, but if a credential definition is removed, we can
+                        // fairly cheaply check if another set of credentials may apply and allow us
+                        // to trigger a rule activation.
+                        activateRule(event);
                         break;
                     default:
                         throw new UnsupportedOperationException(event.getEventType().toString());
                 }
             }
+
+            private void activateRule(Event<CredentialsEvent, String> event) {
+                executor.submit(
+                        () -> {
+                            credentialsManager
+                                    .resolveMatchingTargets(event.getPayload())
+                                    .forEach(
+                                            sr -> {
+                                                registry.getRules(sr).stream()
+                                                        .filter(Rule::isEnabled)
+                                                        .forEach(rule -> activate(rule, sr));
+                                            });
+                        });
+            }
         };
     }
 
@@ -201,27 +201,36 @@ public void onEvent(Event<CredentialsEvent, String> event) {
     public synchronized void accept(TargetDiscoveryEvent tde) {
         switch (tde.getEventKind()) {
             case FOUND:
-                if (!platformClient.contains(tde.getServiceRef())) {
-                    registry.getRules(tde.getServiceRef())
-                            .forEach(
-                                    rule -> {
-                                        if (rule.isEnabled()) {
-                                            activate(rule, tde.getServiceRef());
-                                        }
-                                    });
-                }
+                activateAllRulesFor(tde.getServiceRef());
                 break;
             case LOST:
                 deactivate(null, tde.getServiceRef());
                 break;
             case MODIFIED:
+                activateAllRulesFor(tde.getServiceRef());
                 break;
             default:
                 throw new UnsupportedOperationException(tde.getEventKind().toString());
         }
     }
 
+    private void activateAllRulesFor(ServiceRef serviceRef) {
+        registry.getRules(serviceRef)
+                .forEach(
+                        rule -> {
+                            if (rule.isEnabled()) {
+                                activate(rule, serviceRef);
+                            }
+                        });
+    }
+
     private void activate(Rule rule, ServiceRef serviceRef) {
+        if (StringUtils.isBlank(serviceRef.getJvmId())) {
+            this.logger.trace(
+                    "Target {} has no JVM ID, aborting rule activation",
+                    serviceRef.getServiceUri());
+        }
+        Pair<String, Rule> key = Pair.of(serviceRef.getJvmId(), rule);
         if (!rule.isEnabled()) {
             this.logger.trace(
                     "Activating rule {} for target {} aborted, rule is disabled {} ",
@@ -230,7 +239,7 @@ private void activate(Rule rule, ServiceRef serviceRef) {
                     rule.isEnabled());
             return;
         }
-        if (tasks.containsKey(Pair.of(serviceRef, rule))) {
+        if (tasks.containsKey(key)) {
             this.logger.trace(
                     "Activating rule {} for target {} aborted, rule is already active",
                     rule.getName(),
@@ -240,71 +249,59 @@ private void activate(Rule rule, ServiceRef serviceRef) {
         this.logger.trace(
                 "Activating rule {} for target {}", rule.getName(), serviceRef.getServiceUri());
 
-        vertx.<Credentials>executeBlocking(
-                        promise -> {
+        executor.submit(
+                () -> {
+                    try {
+                        Credentials credentials = credentialsManager.getCredentials(serviceRef);
+                        if (rule.isArchiver()) {
                             try {
-                                Credentials creds = credentialsManager.getCredentials(serviceRef);
-                                promise.complete(creds);
-                            } catch (ScriptException e) {
-                                promise.fail(e);
+                                archiveRuleRecording(
+                                        new ConnectionDescriptor(serviceRef, credentials), rule);
+                            } catch (Exception e) {
+                                logger.error(e);
                             }
-                        })
-                .onSuccess(c -> logger.trace("Rule activation successful"))
-                .onSuccess(
-                        credentials -> {
-                            if (rule.isArchiver()) {
-                                try {
-                                    archiveRuleRecording(
-                                            new ConnectionDescriptor(serviceRef, credentials),
-                                            rule);
-                                } catch (Exception e) {
-                                    logger.error(e);
-                                }
-                            } else {
-                                try {
-                                    startRuleRecording(
-                                            new ConnectionDescriptor(serviceRef, credentials),
-                                            rule);
-                                } catch (Exception e) {
-                                    logger.error(e);
-                                }
-
-                                PeriodicArchiver periodicArchiver =
-                                        periodicArchiverFactory.create(
-                                                serviceRef,
-                                                credentialsManager,
-                                                rule,
-                                                recordingArchiveHelper,
-                                                this::archivalFailureHandler);
-                                Pair<ServiceRef, Rule> key = Pair.of(serviceRef, rule);
-                                Set<Long> ids = tasks.computeIfAbsent(key, k -> new HashSet<>());
-                                int initialDelay = rule.getInitialDelaySeconds();
-                                int archivalPeriodSeconds = rule.getArchivalPeriodSeconds();
-                                if (initialDelay <= 0) {
-                                    initialDelay = archivalPeriodSeconds;
-                                }
-                                if (rule.getPreservedArchives() <= 0
-                                        || archivalPeriodSeconds <= 0) {
+                        } else {
+                            try {
+                                if (!startRuleRecording(
+                                        new ConnectionDescriptor(serviceRef, credentials), rule)) {
                                     return;
                                 }
-                                long initialTask =
-                                        vertx.setTimer(
-                                                Duration.ofSeconds(initialDelay).toMillis(),
-                                                initialId -> {
-                                                    tasks.get(key).remove(initialId);
-                                                    periodicArchiver.run();
-                                                    long periodicTask =
-                                                            vertx.setPeriodic(
-                                                                    Duration.ofSeconds(
-                                                                                    archivalPeriodSeconds)
-                                                                            .toMillis(),
-                                                                    periodicId ->
-                                                                            periodicArchiver.run());
-                                                    ids.add(periodicTask);
-                                                });
-                                ids.add(initialTask);
+                            } catch (Exception e) {
+                                logger.error(e);
+                                return;
                             }
-                        });
+
+                            if (tasks.containsKey(key)) {
+                                tasks.get(key).cancel(false);
+                            }
+
+                            PeriodicArchiver periodicArchiver =
+                                    periodicArchiverFactory.create(
+                                            serviceRef,
+                                            credentialsManager,
+                                            rule,
+                                            recordingArchiveHelper,
+                                            this::archivalFailureHandler);
+                            int initialDelay = rule.getInitialDelaySeconds();
+                            int archivalPeriodSeconds = rule.getArchivalPeriodSeconds();
+                            if (initialDelay <= 0) {
+                                initialDelay = archivalPeriodSeconds;
+                            }
+                            if (rule.getPreservedArchives() <= 0 || archivalPeriodSeconds <= 0) {
+                                return;
+                            }
+                            Future<?> task =
+                                    executor.scheduleAtFixedRate(
+                                            periodicArchiver::run,
+                                            initialDelay,
+                                            archivalPeriodSeconds,
+                                            TimeUnit.SECONDS);
+                            tasks.put(key, task);
+                        }
+                    } catch (ScriptException e) {
+                        logger.error(e);
+                    }
+                });
     }
 
     private void deactivate(Rule rule, ServiceRef serviceRef) {
@@ -317,25 +314,29 @@ private void deactivate(Rule rule, ServiceRef serviceRef) {
         if (serviceRef != null) {
             logger.trace("Deactivating rules for {}", serviceRef.getServiceUri());
         }
-        Iterator<Map.Entry<Pair<ServiceRef, Rule>, Set<Long>>> it = tasks.entrySet().iterator();
+        Iterator<Map.Entry<Pair<String, Rule>, Future<?>>> it = tasks.entrySet().iterator();
         while (it.hasNext()) {
-            Map.Entry<Pair<ServiceRef, Rule>, Set<Long>> entry = it.next();
-            boolean sameRule = Objects.equals(entry.getKey().getRight(), rule);
-            boolean sameTarget = Objects.equals(entry.getKey().getLeft(), serviceRef);
+            Map.Entry<Pair<String, Rule>, Future<?>> entry = it.next();
+            Pair<String, Rule> key = entry.getKey();
+            Future<?> task = entry.getValue();
+            boolean sameTarget =
+                    serviceRef != null && Objects.equals(key.getLeft(), serviceRef.getJvmId());
+            boolean sameRule = Objects.equals(key.getRight(), rule);
             if (sameRule || sameTarget) {
-                Set<Long> ids = entry.getValue();
-                ids.forEach(
-                        (id) -> {
-                            vertx.cancelTimer(id);
-                            logger.trace("Cancelled timer {}", id);
-                        });
-                it.remove();
+                try {
+                    it.remove();
+                    task.cancel(false);
+                } catch (Exception e) {
+                    logger.error(e);
+                }
             }
         }
     }
 
-    private Void archivalFailureHandler(Pair<ServiceRef, Rule> key) {
-        tasks.get(key).forEach(vertx::cancelTimer);
+    private Void archivalFailureHandler(Pair<String, Rule> key) {
+        if (tasks.containsKey(key)) {
+            tasks.get(key).cancel(false);
+        }
         tasks.remove(key);
         return null;
     }
@@ -365,11 +366,20 @@ private void archiveRuleRecording(ConnectionDescriptor connectionDescriptor, Rul
         }
     }
 
-    private void startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule rule) {
+    private boolean startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule rule) {
         CompletableFuture<IRecordingDescriptor> future =
                 targetConnectionManager.executeConnectedTaskAsync(
                         connectionDescriptor,
                         connection -> {
+                            Optional<IRecordingDescriptor> opt =
+                                    recordingTargetHelper.getDescriptorByName(
+                                            connection, rule.getRecordingName());
+                            if (opt.isPresent()) {
+                                if (RecordingState.RUNNING.equals(opt.get().getState())) {
+                                    return null;
+                                }
+                            }
+
                             RecordingOptionsBuilder builder =
                                     recordingOptionsBuilderFactory
                                             .create(connection.getService())
@@ -384,7 +394,7 @@ private void startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule
                                     RecordingTargetHelper.parseEventSpecifierToTemplate(
                                             rule.getEventSpecifier());
                             return recordingTargetHelper.startRecording(
-                                    ReplacementPolicy.ALWAYS,
+                                    ReplacementPolicy.STOPPED,
                                     connectionDescriptor,
                                     builder.build(),
                                     template.getLeft(),
@@ -393,11 +403,14 @@ private void startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule
                                     false);
                         });
         try {
-            future.handleAsync(
+            return future.handleAsync(
                             (recording, throwable) -> {
                                 if (throwable != null) {
                                     logger.error(new RuleException(throwable));
-                                    return null;
+                                    return false;
+                                }
+                                if (recording == null) {
+                                    return false;
                                 }
                                 try {
                                     Map<String, String> labels =
@@ -415,11 +428,12 @@ private void startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule
                                 } catch (IOException ioe) {
                                     logger.error(ioe);
                                 }
-                                return null;
+                                return true;
                             })
                     .get();
         } catch (InterruptedException | ExecutionException e) {
             logger.error(new RuleException(e));
+            return false;
         }
     }
 }
diff --git a/src/main/java/io/cryostat/rules/RulesModule.java b/src/main/java/io/cryostat/rules/RulesModule.java
index 2623143a90..6ba72f8d7a 100644
--- a/src/main/java/io/cryostat/rules/RulesModule.java
+++ b/src/main/java/io/cryostat/rules/RulesModule.java
@@ -19,6 +19,7 @@
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.concurrent.Executors;
 import java.util.function.Function;
 
 import javax.inject.Named;
@@ -114,7 +115,6 @@ static MatchExpressionEvaluator provideMatchExpressionEvaluator(
     @Provides
     @Singleton
     static RuleProcessor provideRuleProcessor(
-            Vertx vertx,
             DiscoveryStorage storage,
             RuleRegistry registry,
             CredentialsManager credentialsManager,
@@ -126,7 +126,7 @@ static RuleProcessor provideRuleProcessor(
             PeriodicArchiverFactory periodicArchiverFactory,
             Logger logger) {
         return new RuleProcessor(
-                vertx,
+                Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2),
                 storage,
                 registry,
                 credentialsManager,
diff --git a/src/test/java/io/cryostat/FakeScheduledExecutorService.java b/src/test/java/io/cryostat/FakeScheduledExecutorService.java
new file mode 100644
index 0000000000..37d667ab3d
--- /dev/null
+++ b/src/test/java/io/cryostat/FakeScheduledExecutorService.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright The Cryostat Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cryostat;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class FakeScheduledExecutorService extends DirectExecutorService
+        implements ScheduledExecutorService {
+
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+        return wrap(submit(command));
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+        return wrap(submit(callable));
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(
+            Runnable command, long initialDelay, long period, TimeUnit unit) {
+        return wrap(submit(command));
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(
+            Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        return wrap(submit(command));
+    }
+
+    private <T> ScheduledFuture<T> wrap(Future<T> delegate) {
+        return new ScheduledFuture<T>() {
+            @Override
+            public long getDelay(TimeUnit unit) {
+                return Long.MAX_VALUE;
+            }
+
+            @Override
+            public int compareTo(Delayed o) {
+                return 0;
+            }
+
+            @Override
+            public boolean cancel(boolean mayInterruptIfRunning) {
+                return delegate.cancel(mayInterruptIfRunning);
+            }
+
+            @Override
+            public boolean isCancelled() {
+                return delegate.isCancelled();
+            }
+
+            @Override
+            public boolean isDone() {
+                return delegate.isDone();
+            }
+
+            @Override
+            public T get() throws InterruptedException, ExecutionException {
+                return delegate.get();
+            }
+
+            @Override
+            public T get(long timeout, TimeUnit unit)
+                    throws InterruptedException, ExecutionException, TimeoutException {
+                return delegate.get(timeout, unit);
+            }
+        };
+    }
+}
diff --git a/src/test/java/io/cryostat/discovery/DiscoveryStorageTest.java b/src/test/java/io/cryostat/discovery/DiscoveryStorageTest.java
index 703b9f6c15..d8a6519f74 100644
--- a/src/test/java/io/cryostat/discovery/DiscoveryStorageTest.java
+++ b/src/test/java/io/cryostat/discovery/DiscoveryStorageTest.java
@@ -29,12 +29,14 @@
 import javax.inject.Singleton;
 
 import io.cryostat.DirectExecutorService;
+import io.cryostat.FakeScheduledExecutorService;
 import io.cryostat.MainModule;
 import io.cryostat.MockVertx;
 import io.cryostat.VerticleDeployer;
 import io.cryostat.configuration.CredentialsManager;
 import io.cryostat.core.log.Logger;
 import io.cryostat.core.net.discovery.JvmDiscoveryClient.EventKind;
+import io.cryostat.core.sys.Clock;
 import io.cryostat.discovery.DiscoveryStorage.NotFoundException;
 import io.cryostat.platform.ServiceRef;
 import io.cryostat.platform.TargetDiscoveryEvent;
@@ -83,6 +85,7 @@ class DiscoveryStorageTest {
     @Mock CredentialsManager credentialsManager;
     @Mock MatchExpressionEvaluator matchExpressionEvaluator;
     @Mock WebClient http;
+    @Mock Clock clock;
     @Mock Logger logger;
     Vertx vertx = MockVertx.vertx();
     Gson gson = MainModule.provideGson(logger);
@@ -107,6 +110,7 @@ void setup() {
         this.storage =
                 new DiscoveryStorage(
                         deployer,
+                        new FakeScheduledExecutorService(),
                         new DirectExecutorService(),
                         Duration.ofMinutes(5),
                         () -> builtin,
@@ -116,6 +120,7 @@ void setup() {
                         () -> matchExpressionEvaluator,
                         gson,
                         http,
+                        clock,
                         logger);
         this.storage.init(vertx, null);
     }
@@ -203,13 +208,11 @@ void removesPluginsIfCallbackRejected() throws Exception {
             storage.start(p);
             f.join();
 
-            Mockito.verify(dao).delete(plugin.getId());
+            Mockito.verify(dao, Mockito.times(2)).delete(plugin.getId());
         }
 
         @Test
         void removesPluginsIfCallbackFails() throws Exception {
-            Mockito.when(deployer.deploy(Mockito.any(), Mockito.anyBoolean()))
-                    .thenReturn(Future.succeededFuture());
             EnvironmentNode realm =
                     new EnvironmentNode("realm", BaseNodeType.REALM, Map.of(), Set.of());
             PluginInfo plugin =
@@ -232,7 +235,6 @@ void removesPluginsIfCallbackFails() throws Exception {
             Mockito.when(req.timeout(Mockito.anyLong())).thenReturn(req);
             Mockito.when(req.followRedirects(Mockito.anyBoolean())).thenReturn(req);
 
-            HttpResponse<Buffer> res = Mockito.mock(HttpResponse.class);
             Future<HttpResponse<Buffer>> future = Future.failedFuture("test failure");
             Mockito.when(req.send()).thenReturn(future);
 
@@ -242,7 +244,7 @@ void removesPluginsIfCallbackFails() throws Exception {
             storage.start(p);
             f.join();
 
-            Mockito.verify(dao).delete(plugin.getId());
+            Mockito.verify(dao, Mockito.times(2)).delete(plugin.getId());
         }
 
         @Test
@@ -697,57 +699,6 @@ void listsAllTreeLeaves() {
             MatcherAssert.assertThat(servicesList, Matchers.hasSize(4));
             MatcherAssert.assertThat(servicesList, Matchers.containsInAnyOrder(sr1, sr2, sr3, sr4));
         }
-
-        @Test
-        void listsUniqueNonNullTreeLeaves() throws Exception {
-            ServiceRef sr1 =
-                    new ServiceRef(
-                            null, URI.create("service:jmx:rmi:///jndi/rmi://leaf:1/jmxrmi"), "sr1");
-            TargetNode leaf1 = new TargetNode(BaseNodeType.JVM, sr1);
-            ServiceRef sr2 =
-                    new ServiceRef(
-                            "same-id",
-                            URI.create("service:jmx:rmi:///jndi/rmi://leaf:2/jmxrmi"),
-                            "sr2");
-            TargetNode leaf2 = new TargetNode(BaseNodeType.JVM, sr2);
-            EnvironmentNode realm1 =
-                    new EnvironmentNode(
-                            DefaultPlatformClient.REALM,
-                            BaseNodeType.REALM,
-                            Map.of(),
-                            Set.of(leaf1, leaf2));
-            PluginInfo plugin1 = new PluginInfo();
-            plugin1.setSubtree(gson.toJson(realm1));
-
-            ServiceRef sr3 =
-                    new ServiceRef(
-                            "same-id",
-                            URI.create("service:jmx:rmi:///jndi/rmi://leaf:3/jmxrmi"),
-                            "sr3");
-            TargetNode leaf3 = new TargetNode(BaseNodeType.JVM, sr3);
-            ServiceRef sr4 =
-                    new ServiceRef(
-                            "other-id",
-                            URI.create("service:jmx:rmi:///jndi/rmi://leaf:4/jmxrmi"),
-                            "sr4");
-            TargetNode leaf4 = new TargetNode(BaseNodeType.JVM, sr4);
-            EnvironmentNode realm2 =
-                    new EnvironmentNode(
-                            CustomTargetPlatformClient.REALM,
-                            BaseNodeType.REALM,
-                            Map.of(),
-                            Set.of(leaf3, leaf4));
-            PluginInfo plugin2 = new PluginInfo();
-            plugin2.setSubtree(gson.toJson(realm2));
-
-            Mockito.when(dao.getAll()).thenReturn(List.of(plugin1, plugin2));
-
-            List<ServiceRef> servicesList = storage.listUniqueReachableServices();
-
-            MatcherAssert.assertThat(servicesList, Matchers.hasSize(2));
-            MatcherAssert.assertThat(servicesList, Matchers.containsInRelativeOrder(sr2, sr4));
-            // sr2 over sr3 because RealmOrder of "JDP" has higher priority than "Custom Targets"
-        }
     }
 
     @Nested
diff --git a/src/test/java/io/cryostat/net/TargetConnectionManagerTest.java b/src/test/java/io/cryostat/net/TargetConnectionManagerTest.java
index 05beba2d6f..c1171d2397 100644
--- a/src/test/java/io/cryostat/net/TargetConnectionManagerTest.java
+++ b/src/test/java/io/cryostat/net/TargetConnectionManagerTest.java
@@ -63,6 +63,7 @@ void setup() {
                         Scheduler.disabledScheduler(),
                         TTL,
                         -1,
+                        10,
                         logger);
     }
 
@@ -156,6 +157,7 @@ void shouldCreateNewConnectionForAccessDelayedLongerThanTTL() throws Exception {
                         Scheduler.systemScheduler(),
                         Duration.ofNanos(1),
                         1,
+                        10,
                         logger);
         Mockito.when(jfrConnectionToolkit.connect(Mockito.any(), Mockito.any(), Mockito.any()))
                 .thenAnswer(
@@ -185,6 +187,7 @@ void shouldCreateNewConnectionPerTarget() throws Exception {
                         Scheduler.disabledScheduler(),
                         Duration.ofNanos(1),
                         -1,
+                        10,
                         logger);
         Mockito.when(jfrConnectionToolkit.connect(Mockito.any(), Mockito.any(), Mockito.any()))
                 .thenAnswer(
@@ -223,6 +226,7 @@ void shouldConnectToAgents(String url) throws Exception {
                         Scheduler.disabledScheduler(),
                         Duration.ofNanos(1),
                         -1,
+                        10,
                         logger);
         ConnectionDescriptor desc = new ConnectionDescriptor(url);
         JFRConnection conn = mgr.executeConnectedTask(desc, a -> a);
diff --git a/src/test/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandlerTest.java b/src/test/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandlerTest.java
index 4ea311a12f..72307e892c 100644
--- a/src/test/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandlerTest.java
+++ b/src/test/java/io/cryostat/net/web/http/api/v2/RuleDeleteHandlerTest.java
@@ -186,7 +186,7 @@ void shouldRespondWith404ForNonexistentRule() throws Exception {
             Mockito.verify(registry, Mockito.never()).deleteRule(Mockito.anyString());
             Mockito.verify(registry, Mockito.never()).applies(Mockito.any(), Mockito.any());
             Mockito.verify(recordingTargetHelper, Mockito.never())
-                    .stopRecording(Mockito.any(), Mockito.any());
+                    .stopRecording(Mockito.any(), Mockito.any(), Mockito.eq(true));
         }
 
         @Test
@@ -211,11 +211,13 @@ void shouldRespondWith200ForCleanupFailures() throws Exception {
                             "id",
                             new URI("service:jmx:rmi:///jndi/rmi://cryostat:9091/jmxrmi"),
                             "io.cryostat.Cryostat");
-            Mockito.when(storage.listUniqueReachableServices()).thenReturn(List.of(serviceRef));
+            Mockito.when(storage.listDiscoverableServices()).thenReturn(List.of(serviceRef));
 
             FlightRecorderException exception =
                     new FlightRecorderException(new Exception("test message"));
-            Mockito.when(recordingTargetHelper.stopRecording(Mockito.any(), Mockito.any()))
+            Mockito.when(
+                            recordingTargetHelper.stopRecording(
+                                    Mockito.any(), Mockito.any(), Mockito.eq(true)))
                     .thenThrow(exception);
 
             IntermediateResponse<Void> response = handler.handle(params);
@@ -225,7 +227,8 @@ void shouldRespondWith200ForCleanupFailures() throws Exception {
             Mockito.verify(registry).deleteRule(rule);
             Mockito.verify(registry).applies(rule, serviceRef);
             Mockito.verify(recordingTargetHelper)
-                    .stopRecording(Mockito.any(), Mockito.eq(rule.getRecordingName()));
+                    .stopRecording(
+                            Mockito.any(), Mockito.eq(rule.getRecordingName()), Mockito.eq(true));
             Mockito.verify(logger).error(exception);
         }
 
@@ -245,7 +248,7 @@ void shouldRespondWith200AfterCleanupNoop() throws Exception {
             Mockito.when(registry.hasRuleByName(testRuleName)).thenReturn(true);
             Mockito.when(registry.getRule(testRuleName)).thenReturn(Optional.of(rule));
 
-            Mockito.when(storage.listUniqueReachableServices()).thenReturn(List.of());
+            Mockito.when(storage.listDiscoverableServices()).thenReturn(List.of());
 
             IntermediateResponse<Void> response = handler.handle(params);
             MatcherAssert.assertThat(response.getStatusCode(), Matchers.equalTo(200));
@@ -254,7 +257,8 @@ void shouldRespondWith200AfterCleanupNoop() throws Exception {
             Mockito.verify(registry).deleteRule(rule);
             Mockito.verify(registry, Mockito.never()).applies(Mockito.any(), Mockito.any());
             Mockito.verify(recordingTargetHelper, Mockito.never())
-                    .stopRecording(Mockito.any(), Mockito.eq(rule.getRecordingName()));
+                    .stopRecording(
+                            Mockito.any(), Mockito.eq(rule.getRecordingName()), Mockito.eq(true));
         }
 
         @Test
@@ -279,7 +283,7 @@ void shouldRespondWith200AfterSuccessfulCleanup() throws Exception {
                             "id",
                             new URI("service:jmx:rmi:///jndi/rmi://cryostat:9091/jmxrmi"),
                             "io.cryostat.Cryostat");
-            Mockito.when(storage.listUniqueReachableServices()).thenReturn(List.of(serviceRef));
+            Mockito.when(storage.listDiscoverableServices()).thenReturn(List.of(serviceRef));
 
             IntermediateResponse<Void> response = handler.handle(params);
             MatcherAssert.assertThat(response.getStatusCode(), Matchers.equalTo(200));
@@ -288,7 +292,8 @@ void shouldRespondWith200AfterSuccessfulCleanup() throws Exception {
             Mockito.verify(registry).deleteRule(rule);
             Mockito.verify(registry).applies(Mockito.any(), Mockito.any());
             Mockito.verify(recordingTargetHelper)
-                    .stopRecording(Mockito.any(), Mockito.eq(rule.getRecordingName()));
+                    .stopRecording(
+                            Mockito.any(), Mockito.eq(rule.getRecordingName()), Mockito.eq(true));
         }
     }
 }
diff --git a/src/test/java/io/cryostat/net/web/http/api/v2/RulePatchHandlerTest.java b/src/test/java/io/cryostat/net/web/http/api/v2/RulePatchHandlerTest.java
index 240be4ba6f..afb680163b 100644
--- a/src/test/java/io/cryostat/net/web/http/api/v2/RulePatchHandlerTest.java
+++ b/src/test/java/io/cryostat/net/web/http/api/v2/RulePatchHandlerTest.java
@@ -207,7 +207,7 @@ void shouldDisableRuleAndCleanup() throws Exception {
                             "id",
                             new URI("service:jmx:rmi:///jndi/rmi://cryostat:9091/jmxrmi"),
                             "io.cryostat.Cryostat");
-            Mockito.when(storage.listUniqueReachableServices()).thenReturn(List.of(serviceRef));
+            Mockito.when(storage.listDiscoverableServices()).thenReturn(List.of(serviceRef));
 
             IntermediateResponse<Void> response = handler.handle(params);
 
@@ -226,7 +226,8 @@ void shouldDisableRuleAndCleanup() throws Exception {
             Mockito.verify(recordingTargetHelper)
                     .stopRecording(
                             Mockito.any(ConnectionDescriptor.class),
-                            Mockito.eq(rule.getRecordingName()));
+                            Mockito.eq(rule.getRecordingName()),
+                            Mockito.eq(true));
 
             MatcherAssert.assertThat(response.getStatusCode(), Matchers.equalTo(204));
         }
diff --git a/src/test/java/io/cryostat/recordings/RecordingArchiveHelperTest.java b/src/test/java/io/cryostat/recordings/RecordingArchiveHelperTest.java
index 492106b4d0..5ce61543e0 100644
--- a/src/test/java/io/cryostat/recordings/RecordingArchiveHelperTest.java
+++ b/src/test/java/io/cryostat/recordings/RecordingArchiveHelperTest.java
@@ -1165,12 +1165,15 @@ public String answer(InvocationOnMock invocation) throws Throwable {
     void shouldGetArchivedTimeFromTimestamp() {
         // December 19, 2019 | 8:38:34 PM UTC
         String timestamp = "20191219T213834Z";
-        Long time = recordingArchiveHelper.getArchivedTimeFromTimestamp(timestamp);
+        long time = recordingArchiveHelper.getArchivedTimeFromTimestamp(timestamp);
         long expectedArchivedTime = Instant.parse("2019-12-19T21:38:34.00Z").toEpochMilli();
         MatcherAssert.assertThat(time, Matchers.equalTo(expectedArchivedTime));
 
         String invalid = "";
-        Long invalidTime = recordingArchiveHelper.getArchivedTimeFromTimestamp(invalid);
-        MatcherAssert.assertThat(invalidTime, Matchers.equalTo(Instant.now().toEpochMilli()));
+        long invalidTime = recordingArchiveHelper.getArchivedTimeFromTimestamp(invalid);
+        long tolerance = 10;
+        long now = Instant.now().toEpochMilli();
+        MatcherAssert.assertThat(invalidTime, Matchers.lessThanOrEqualTo(now + tolerance));
+        MatcherAssert.assertThat(invalidTime, Matchers.greaterThanOrEqualTo(now - tolerance));
     }
 }
diff --git a/src/test/java/io/cryostat/rules/RuleProcessorTest.java b/src/test/java/io/cryostat/rules/RuleProcessorTest.java
index bb47851bac..bcdc5c6a80 100644
--- a/src/test/java/io/cryostat/rules/RuleProcessorTest.java
+++ b/src/test/java/io/cryostat/rules/RuleProcessorTest.java
@@ -15,12 +15,13 @@
  */
 package io.cryostat.rules;
 
-import static org.junit.jupiter.api.Assertions.*;
-import static org.mockito.Mockito.*;
-
 import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import org.openjdk.jmc.common.unit.IConstrainedMap;
@@ -28,7 +29,7 @@
 import org.openjdk.jmc.rjmx.services.jfr.IFlightRecorderService;
 import org.openjdk.jmc.rjmx.services.jfr.IRecordingDescriptor;
 
-import io.cryostat.MockVertx;
+import io.cryostat.FakeScheduledExecutorService;
 import io.cryostat.configuration.CredentialsManager;
 import io.cryostat.configuration.CredentialsManager.CredentialsEvent;
 import io.cryostat.core.log.Logger;
@@ -50,8 +51,6 @@
 import io.cryostat.util.events.Event;
 import io.cryostat.util.events.EventListener;
 
-import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
 import org.apache.commons.lang3.tuple.Pair;
 import org.hamcrest.MatcherAssert;
 import org.hamcrest.Matchers;
@@ -70,7 +69,7 @@
 class RuleProcessorTest {
 
     RuleProcessor processor;
-    Vertx vertx;
+    FakeScheduledExecutorService executor;
     @Mock PlatformClient platformClient;
     @Mock RuleRegistry registry;
     @Mock CredentialsManager credentialsManager;
@@ -87,10 +86,10 @@ class RuleProcessorTest {
 
     @BeforeEach
     void setup() {
-        this.vertx = MockVertx.vertx();
+        this.executor = Mockito.spy(new FakeScheduledExecutorService());
         this.processor =
                 new RuleProcessor(
-                        vertx,
+                        executor,
                         platformClient,
                         registry,
                         credentialsManager,
@@ -172,6 +171,28 @@ void testSuccessfulRuleActivationWithCredentials() throws Exception {
 
         Mockito.when(registry.getRules(serviceRef)).thenReturn(Set.of(rule));
 
+        IRecordingDescriptor autoRule = Mockito.mock(IRecordingDescriptor.class);
+
+        Mockito.when(recordingTargetHelper.getDescriptorByName(Mockito.any(), Mockito.any()))
+                .thenReturn(Optional.empty())
+                .thenReturn(Optional.of(autoRule));
+
+        Mockito.when(
+                        recordingTargetHelper.startRecording(
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.anyBoolean()))
+                .thenReturn(autoRule);
+
+        Metadata metadata = new Metadata(Map.of());
+
+        Mockito.when(metadataManager.getMetadata(Mockito.any(), Mockito.any()))
+                .thenReturn(metadata);
+
         PeriodicArchiver periodicArchiver = Mockito.mock(PeriodicArchiver.class);
         Mockito.when(
                         periodicArchiverFactory.create(
@@ -217,7 +238,7 @@ void testSuccessfulRuleActivationWithCredentials() throws Exception {
                         archiveOnStopCaptor.capture());
 
         MatcherAssert.assertThat(
-                replaceCaptor.getValue(), Matchers.equalTo(ReplacementPolicy.ALWAYS));
+                replaceCaptor.getValue(), Matchers.equalTo(ReplacementPolicy.STOPPED));
 
         ConnectionDescriptor connectionDescriptor = connectionDescriptorCaptor.getValue();
         MatcherAssert.assertThat(
@@ -235,16 +256,16 @@ void testSuccessfulRuleActivationWithCredentials() throws Exception {
 
         MatcherAssert.assertThat(metadataCaptor.getValue(), Matchers.equalTo(new Metadata()));
 
-        ArgumentCaptor<Handler<Long>> handlerCaptor = ArgumentCaptor.forClass(Handler.class);
-        Mockito.verify(vertx).setTimer(Mockito.eq(67_000L), handlerCaptor.capture());
+        ArgumentCaptor<Runnable> handlerCaptor = ArgumentCaptor.forClass(Runnable.class);
+        Mockito.verify(executor)
+                .scheduleAtFixedRate(
+                        handlerCaptor.capture(),
+                        Mockito.eq((long) rule.getInitialDelaySeconds()),
+                        Mockito.eq((long) rule.getArchivalPeriodSeconds()),
+                        Mockito.eq(TimeUnit.SECONDS));
 
-        Mockito.verify(periodicArchiver, Mockito.times(0)).run();
-        handlerCaptor.getValue().handle(1234L);
         Mockito.verify(periodicArchiver, Mockito.times(1)).run();
-
-        Mockito.verify(vertx).setPeriodic(Mockito.eq(67_000L), handlerCaptor.capture());
-
-        handlerCaptor.getValue().handle(1234L);
+        handlerCaptor.getValue().run();
         Mockito.verify(periodicArchiver, Mockito.times(2)).run();
     }
 
@@ -334,9 +355,56 @@ void testTaskCancellationOnFailure() throws Exception {
                         .archivalPeriodSeconds(67)
                         .build();
 
+        RecordingOptionsBuilder recordingOptionsBuilder =
+                Mockito.mock(RecordingOptionsBuilder.class);
+        Mockito.when(recordingOptionsBuilder.name(Mockito.any()))
+                .thenReturn(recordingOptionsBuilder);
+        Mockito.when(recordingOptionsBuilder.toDisk(Mockito.anyBoolean()))
+                .thenReturn(recordingOptionsBuilder);
+        Mockito.when(recordingOptionsBuilder.maxAge(Mockito.anyLong()))
+                .thenReturn(recordingOptionsBuilder);
+        Mockito.when(recordingOptionsBuilder.maxSize(Mockito.anyLong()))
+                .thenReturn(recordingOptionsBuilder);
+        Mockito.when(recordingOptionsBuilderFactory.create(Mockito.any()))
+                .thenReturn(recordingOptionsBuilder);
+        IConstrainedMap<String> recordingOptions = Mockito.mock(IConstrainedMap.class);
+        Mockito.when(recordingOptionsBuilder.build()).thenReturn(recordingOptions);
+
+        Mockito.when(
+                        targetConnectionManager.executeConnectedTaskAsync(
+                                Mockito.any(), Mockito.any()))
+                .thenAnswer(
+                        arg0 ->
+                                CompletableFuture.completedFuture(
+                                        ((TargetConnectionManager.ConnectedTask<Object>)
+                                                        arg0.getArgument(1))
+                                                .execute(connection)));
+
         Mockito.when(registry.getRules(serviceRef)).thenReturn(Set.of(rule));
 
-        PeriodicArchiver periodicArchiver = Mockito.mock(PeriodicArchiver.class);
+        IRecordingDescriptor autoRule = Mockito.mock(IRecordingDescriptor.class);
+
+        Mockito.when(recordingTargetHelper.getDescriptorByName(Mockito.any(), Mockito.any()))
+                .thenReturn(Optional.empty())
+                .thenReturn(Optional.of(autoRule));
+
+        Mockito.when(
+                        recordingTargetHelper.startRecording(
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.anyBoolean()))
+                .thenReturn(autoRule);
+
+        Metadata metadata = new Metadata(Map.of());
+
+        Mockito.when(metadataManager.getMetadata(Mockito.any(), Mockito.any()))
+                .thenReturn(metadata);
+
+        PeriodicArchiver[] pa = new PeriodicArchiver[1];
         Mockito.when(
                         periodicArchiverFactory.create(
                                 Mockito.any(),
@@ -344,27 +412,50 @@ void testTaskCancellationOnFailure() throws Exception {
                                 Mockito.any(),
                                 Mockito.any(),
                                 Mockito.any()))
-                .thenReturn(periodicArchiver);
+                .thenAnswer(
+                        new Answer<PeriodicArchiver>() {
+                            @Override
+                            public PeriodicArchiver answer(InvocationOnMock invocation)
+                                    throws Throwable {
+                                CredentialsManager cm = invocation.getArgument(1);
+                                Function<Pair<String, Rule>, Void> fn = invocation.getArgument(4);
+                                PeriodicArchiver p =
+                                        new PeriodicArchiver(
+                                                serviceRef,
+                                                cm,
+                                                rule,
+                                                recordingArchiveHelper,
+                                                fn,
+                                                logger);
+                                pa[0] = p;
+                                return p;
+                            }
+                        });
+
+        ScheduledFuture task = Mockito.mock(ScheduledFuture.class);
+        Mockito.doReturn(task)
+                .when(executor)
+                .scheduleAtFixedRate(
+                        Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any());
 
         processor.accept(tde);
 
-        Mockito.verify(vertx).setTimer(Mockito.eq(67_000L), Mockito.any());
+        Mockito.verify(executor)
+                .scheduleAtFixedRate(
+                        Mockito.any(),
+                        Mockito.eq((long) rule.getInitialDelaySeconds()),
+                        Mockito.eq((long) rule.getArchivalPeriodSeconds()),
+                        Mockito.eq(TimeUnit.SECONDS));
 
-        ArgumentCaptor<Function<Pair<ServiceRef, Rule>, Void>> functionCaptor =
-                ArgumentCaptor.forClass(Function.class);
         Mockito.verify(periodicArchiverFactory)
-                .create(
-                        Mockito.any(),
-                        Mockito.any(),
-                        Mockito.any(),
-                        Mockito.any(),
-                        functionCaptor.capture());
-        Function<Pair<ServiceRef, Rule>, Void> failureFunction = functionCaptor.getValue();
-        Mockito.verify(vertx, Mockito.never()).cancelTimer(MockVertx.TIMER_ID);
+                .create(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+        Mockito.verify(task, Mockito.never()).cancel(Mockito.anyBoolean());
 
-        failureFunction.apply(Pair.of(serviceRef, rule));
+        Mockito.when(recordingArchiveHelper.getRecordings(Mockito.any()))
+                .thenReturn(CompletableFuture.failedFuture(new SecurityException()));
+        pa[0].run();
 
-        Mockito.verify(vertx).cancelTimer(MockVertx.TIMER_ID);
+        Mockito.verify(task).cancel(false);
     }
 
     @Test
@@ -472,7 +563,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
                         archiveOnStopCaptor.capture());
 
         MatcherAssert.assertThat(
-                replaceCaptor.getValue(), Matchers.equalTo(ReplacementPolicy.ALWAYS));
+                replaceCaptor.getValue(), Matchers.equalTo(ReplacementPolicy.STOPPED));
 
         IConstrainedMap<String> actualRecordingOptions = recordingOptionsCaptor.getValue();
         MatcherAssert.assertThat(actualRecordingOptions, Matchers.sameInstance(recordingOptions));
@@ -511,7 +602,7 @@ void testSuccessfulRuleNonActivationWithCredentials() throws Exception {
 
         processor.accept(tde);
 
-        Mockito.verify(recordingOptionsBuilder, never()).name("auto_Test_Rule");
+        Mockito.verify(recordingOptionsBuilder, Mockito.never()).name("auto_Test_Rule");
 
         ArgumentCaptor<ReplacementPolicy> replaceCaptor =
                 ArgumentCaptor.forClass(ReplacementPolicy.class);
@@ -531,7 +622,7 @@ void testSuccessfulRuleNonActivationWithCredentials() throws Exception {
 
         ArgumentCaptor<Boolean> archiveOnStopCaptor = ArgumentCaptor.forClass(Boolean.class);
 
-        Mockito.verify(recordingTargetHelper, never())
+        Mockito.verify(recordingTargetHelper, Mockito.never())
                 .startRecording(
                         replaceCaptor.capture(),
                         connectionDescriptorCaptor.capture(),
diff --git a/src/test/java/itest/AutoRulesCleanupIT.java b/src/test/java/itest/AutoRulesCleanupIT.java
index a0a555d7ee..0bb77713b8 100644
--- a/src/test/java/itest/AutoRulesCleanupIT.java
+++ b/src/test/java/itest/AutoRulesCleanupIT.java
@@ -201,6 +201,8 @@ void testAddRule() throws TimeoutException, InterruptedException, ExecutionExcep
     @Order(2)
     void testAddRuleTriggersRecordingCreation()
             throws TimeoutException, InterruptedException, ExecutionException {
+        Thread.sleep(3_000); // Wait for rule to trigger
+
         CompletableFuture<JsonArray> future = new CompletableFuture<>();
         webClient
                 .get(String.format("/api/v1/targets/%s/recordings", jmxServiceUrlEncoded))