diff --git a/CHANGES.txt b/CHANGES.txt index 22818c9313e8..b9d4aa1fc902 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0.1 + * Harden PrunableArrayQueue against Pruner implementations that might throw exceptions (CASSANDRA-16866) * Move RepairedDataInfo to the execution controller rather than the ReadCommand to avoid unintended sharing (CASSANDRA-16721) * Bump zstd-jni version to 1.5.0-4 (CASSANDRA-16884) * Remove assumption that all urgent messages are small (CASSANDRA-16877) diff --git a/src/java/org/apache/cassandra/net/PrunableArrayQueue.java b/src/java/org/apache/cassandra/net/PrunableArrayQueue.java index 1fca43ca2b33..9c91ca71255c 100644 --- a/src/java/org/apache/cassandra/net/PrunableArrayQueue.java +++ b/src/java/org/apache/cassandra/net/PrunableArrayQueue.java @@ -19,6 +19,8 @@ import java.util.function.Predicate; +import org.apache.cassandra.utils.Throwables; + /** * A growing array-based queue that allows efficient bulk in-place removal. * @@ -103,10 +105,12 @@ boolean isEmpty() * * @return count of removed elements. */ + @SuppressWarnings("ThrowFromFinallyBlock") int prune(Pruner pruner) { E e; int removed = 0; + Throwable error = null; try { @@ -120,11 +124,35 @@ int prune(Pruner pruner) int k = (tail - 1 - i) & mask; e = buffer[k]; - if (pruner.shouldPrune(e)) + boolean shouldPrune = false; + + // If any error has been thrown from the Pruner callbacks, don't bother asking the + // pruner. Just move any elements that need to be moved, correct the head, and rethrow. + if (error == null) + { + try + { + shouldPrune = pruner.shouldPrune(e); + } + catch (Throwable t) + { + error = t; + } + } + + if (shouldPrune) { buffer[k] = null; removed++; - pruner.onPruned(e); + + try + { + pruner.onPruned(e); + } + catch (Throwable t) + { + error = t; + } } else { @@ -133,13 +161,28 @@ int prune(Pruner pruner) buffer[(k + removed) & mask] = e; buffer[k] = null; } - pruner.onKept(e); + + try + { + pruner.onKept(e); + } + catch (Throwable t) + { + if (error == null) + { + error = t; + } + } } } } finally { head = (head + removed) & mask; + + // Rethrow any error(s) from the Pruner callbacks, but only after the queue state is valid. + if (error != null) + throw Throwables.unchecked(error); } return removed; diff --git a/test/unit/org/apache/cassandra/net/PrunableArrayQueueTest.java b/test/unit/org/apache/cassandra/net/PrunableArrayQueueTest.java index c4fd55a8aa27..34f61a611498 100644 --- a/test/unit/org/apache/cassandra/net/PrunableArrayQueueTest.java +++ b/test/unit/org/apache/cassandra/net/PrunableArrayQueueTest.java @@ -17,14 +17,21 @@ */ package org.apache.cassandra.net; +import java.util.Random; + import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.net.PrunableArrayQueue; import static org.junit.Assert.*; public class PrunableArrayQueueTest { + private static final Logger logger = LoggerFactory.getLogger(PrunableArrayQueueTest.class); + private final PrunableArrayQueue queue = new PrunableArrayQueue<>(8); @Test @@ -127,4 +134,70 @@ public void onKept(Integer val) assertEquals((Integer) i, queue.poll()); assertTrue(queue.isEmpty()); } + + @Test + public void testUnreliablePruner() + { + long seed = System.currentTimeMillis(); + Random rand = new Random(seed); + + logger.info("Testing unreliable pruner with random seed {}...", seed); + + int iteratons = 100; + int startingQueueSize = 1024; + double pruneChance = 0.1; + double errorOnKeptChance = 0.00005; + double errorOnPruneChance = 0.00002; + + for (int i = 0; i < iteratons; i++) + { + int failureValue = rand.nextInt(startingQueueSize); + + PrunableArrayQueue testQueue = new PrunableArrayQueue<>(startingQueueSize); + + for (int o = 0; o < startingQueueSize; o++) + testQueue.offer(o); + + class UnreliablePruner implements PrunableArrayQueue.Pruner + { + public boolean shouldPrune(Integer value) + { + if (rand.nextDouble() < errorOnPruneChance) + throw new RuntimeException("Failed on pruning check for value: " + value); + + return rand.nextDouble() < pruneChance; + } + + public void onPruned(Integer value) + { + if (value == failureValue) + throw new RuntimeException("Failed on pruned value: " + value); + } + + public void onKept(Integer value) + { + if (rand.nextDouble() < errorOnKeptChance) + throw new RuntimeException("Failed on retained value: " + value); + } + } + + assertEquals(startingQueueSize, testQueue.size()); + + try + { + testQueue.prune(new UnreliablePruner()); + } + catch (RuntimeException e) + { + logger.info("Expected pruning failure with seed {}", seed, e); + } + + for (int p = 0, postPruneSize = testQueue.size(); p < postPruneSize; p++) + { + assertNotNull("Queue should contain no null elements after pruning. Seed: " + seed + ". Iteration: " + i, testQueue.poll()); + } + + assertEquals("Queue size should be zero after draining. Seed: " + seed + ". Iteration: " + i, 0, testQueue.size()); + } + } } \ No newline at end of file