From 9081d0088119f02c8118598ac9fdb8ce39f0033b Mon Sep 17 00:00:00 2001 From: Marcus Eriksson Date: Tue, 15 Oct 2019 14:58:14 +0200 Subject: [PATCH] Avoid adding fake row deletions if they are superseeded by the partition deletion Patch by marcuse; reviewed by Aleksey Yeshchenko and Benedict Elliott Smith for CASSANDRA-15363 --- .../cassandra/distributed/api/IInstance.java | 3 + .../impl/DelegatingInvokableInstance.java | 10 ++++ .../cassandra/distributed/impl/Instance.java | 19 ++++++ .../upgrade/MixedModeReadRepairTest.java | 59 +++++++++++++++++++ .../distributed/upgrade/UpgradeTestBase.java | 16 ++++- 5 files changed, 106 insertions(+), 1 deletion(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java index d5382b4970f0..6a9e33a09ccc 100644 --- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java +++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java @@ -47,4 +47,7 @@ public interface IInstance extends IIsolatedExecutor int getMessagingVersion(); void setMessagingVersion(InetAddressAndPort endpoint, int version); + + void flush(String keyspace); + void forceCompact(String keyspace, String table); } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java index e9e684463d1c..72949447eb46 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java @@ -85,6 +85,16 @@ public void setMessagingVersion(InetAddressAndPort endpoint, int version) delegate().setMessagingVersion(endpoint, version); } + public void flush(String keyspace) + { + delegate().flush(keyspace); + } + + public void forceCompact(String keyspace, String table) + { + delegate().forceCompact(keyspace, table); + } + @Override public IInstanceConfig config() { diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index af5ec65970ff..e405d6ba514f 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -343,6 +343,25 @@ public void setMessagingVersion(InetAddressAndPort endpoint, int version) runOnInstance(() -> MessagingService.instance().setVersion(endpoint.address, version)); } + public void flush(String keyspace) + { + runOnInstance(() -> FBUtilities.waitOnFutures(Keyspace.open(keyspace).flush())); + } + + public void forceCompact(String keyspace, String table) + { + runOnInstance(() -> { + try + { + Keyspace.open(keyspace).getColumnFamilyStore(table).forceMajorCompaction(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + } + @Override public void startup(ICluster cluster) { diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java new file mode 100644 index 000000000000..31f4b84ccf2c --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import org.junit.Test; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.distributed.impl.Versions; +import org.apache.cassandra.distributed.test.DistributedTestBase; + +import static org.apache.cassandra.distributed.impl.Versions.find; + +public class MixedModeReadRepairTest extends UpgradeTestBase +{ + @Test + public void mixedModeReadRepairCompactStorage() throws Throwable + { + new TestCase() + .nodes(2) + .upgrade(Versions.Major.v22, Versions.Major.v30) + .nodesToUpgrade(2) + .setup((cluster) -> cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk ascii, b boolean, v blob, PRIMARY KEY (pk)) WITH COMPACT STORAGE")) + .runAfterClusterUpgrade((cluster) -> { + // now node2 is 3.0 and node1 is 2.2 + // make sure 2.2 side does not get the mutation + cluster.get(2).executeInternal("DELETE FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?", + "something"); + // trigger a read repair + cluster.coordinator(1).execute("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?", + ConsistencyLevel.ALL, + "something"); + cluster.get(1).flush(DistributedTestBase.KEYSPACE); + // upgrade node1 to 3.0 + cluster.get(1).shutdown().get(); + Versions allVersions = find(); + cluster.get(1).setVersion(allVersions.getLatest(Versions.Major.v30)); + cluster.get(1).startup(); + + // and make sure the sstables are readable + cluster.get(1).forceCompact(DistributedTestBase.KEYSPACE, "tbl"); + }).run(); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java index 44037675bac9..27094d8bd90c 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java @@ -20,7 +20,9 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.cassandra.distributed.UpgradeableCluster; import org.apache.cassandra.distributed.impl.Instance; @@ -63,6 +65,7 @@ public static class TestCase implements Instance.ThrowingRunnable private RunOnCluster setup; private RunOnClusterAndNode runAfterNodeUpgrade; private RunOnCluster runAfterClusterUpgrade; + private final Set nodesToUpgrade = new HashSet<>(); public TestCase() { @@ -125,6 +128,9 @@ public void run() throws Throwable runAfterClusterUpgrade = (c) -> {}; if (runAfterNodeUpgrade == null) runAfterNodeUpgrade = (c, n) -> {}; + if (nodesToUpgrade.isEmpty()) + for (int n = 1; n <= nodeCount; n++) + nodesToUpgrade.add(n); for (TestVersions upgrade : this.upgrade) { @@ -134,7 +140,7 @@ public void run() throws Throwable for (Version version : upgrade.upgrade) { - for (int n = 1 ; n <= nodeCount ; ++n) + for (int n : nodesToUpgrade) { cluster.get(n).shutdown().get(); cluster.get(n).setVersion(version); @@ -148,6 +154,14 @@ public void run() throws Throwable } } + public TestCase nodesToUpgrade(int ... nodes) + { + for (int n : nodes) + { + nodesToUpgrade.add(n); + } + return this; + } } }