diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index aef588eff567..64d00861b4d0 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -122,6 +122,24 @@ public boolean shouldInclude(InetAddress endpoint) } } + /** + * Source filter which only includes endpoints contained within a provided set. + */ + public static class WhitelistedSourcesFilter implements ISourceFilter + { + private final Set whitelistedSources; + + public WhitelistedSourcesFilter(Set whitelistedSources) + { + this.whitelistedSources = whitelistedSources; + } + + public boolean shouldInclude(InetAddress endpoint) + { + return whitelistedSources.contains(endpoint); + } + } + public RangeStreamer(TokenMetadata metadata, Collection tokens, InetAddress address, diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index b1d8e265ebee..bf34c1ee2576 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -80,6 +80,10 @@ import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport; import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport; +import java.util.Scanner; +import java.util.regex.MatchResult; +import java.util.regex.Pattern; + import static java.util.concurrent.TimeUnit.MINUTES; /** @@ -1111,12 +1115,26 @@ public boolean isJoined() public void rebuild(String sourceDc) { // check on going rebuild + rebuild(sourceDc, null, null, null); + } + + public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources) + { + // check ongoing rebuild if (!isRebuilding.compareAndSet(false, true)) { throw new IllegalStateException("Node is still rebuilding. Check nodetool netstats."); } - logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc); + // check the arguments + if (keyspace == null && tokens != null) + { + throw new IllegalArgumentException("Cannot specify tokens without keyspace."); + } + + logger.info("rebuild from dc: {}, {}, {}", sourceDc == null ? "(any dc)" : sourceDc, + keyspace == null ? "(All keyspaces)" : keyspace, + tokens == null ? "(All tokens)" : tokens); try { @@ -1131,8 +1149,78 @@ public void rebuild(String sourceDc) if (sourceDc != null) streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc)); - for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) - streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName)); + if (keyspace == null) + { + for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) + streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName)); + } + else if (tokens == null) + { + streamer.addRanges(keyspace, getLocalRanges(keyspace)); + } + else + { + Token.TokenFactory factory = getPartitioner().getTokenFactory(); + List> ranges = new ArrayList<>(); + Pattern rangePattern = Pattern.compile("\\(\\s*(-?\\w+)\\s*,\\s*(-?\\w+)\\s*\\]"); + try (Scanner tokenScanner = new Scanner(tokens)) + { + while (tokenScanner.findInLine(rangePattern) != null) + { + MatchResult range = tokenScanner.match(); + Token startToken = factory.fromString(range.group(1)); + Token endToken = factory.fromString(range.group(2)); + logger.info("adding range: ({},{}]", startToken, endToken); + ranges.add(new Range<>(startToken, endToken)); + } + if (tokenScanner.hasNext()) + throw new IllegalArgumentException("Unexpected string: " + tokenScanner.next()); + } + + // Ensure all specified ranges are actually ranges owned by this host + Collection> localRanges = getLocalRanges(keyspace); + for (Range specifiedRange : ranges) + { + boolean foundParentRange = false; + for (Range localRange : localRanges) + { + if (localRange.contains(specifiedRange)) + { + foundParentRange = true; + break; + } + } + if (!foundParentRange) + { + throw new IllegalArgumentException(String.format("The specified range %s is not a range that is owned by this node. Please ensure that all token ranges specified to be rebuilt belong to this node.", specifiedRange.toString())); + } + } + + if (specificSources != null) + { + String[] stringHosts = specificSources.split(","); + Set sources = new HashSet<>(stringHosts.length); + for (String stringHost : stringHosts) + { + try + { + InetAddress endpoint = InetAddress.getByName(stringHost); + if (FBUtilities.getBroadcastAddress().equals(endpoint)) + { + throw new IllegalArgumentException("This host was specified as a source for rebuilding. Sources for a rebuild can only be other nodes in the cluster."); + } + sources.add(endpoint); + } + catch (UnknownHostException ex) + { + throw new IllegalArgumentException("Unknown host specified " + stringHost, ex); + } + } + streamer.addSourceFilter(new RangeStreamer.WhitelistedSourcesFilter(sources)); + } + + streamer.addRanges(keyspace, ranges); + } StreamResultFuture resultFuture = streamer.fetchAsync(); // wait for result diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 43d26c64bc83..5a4d11347f5a 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -519,6 +519,16 @@ public interface StorageServiceMBean extends NotificationEmitter */ public void rebuild(String sourceDc); + /** + * Same as {@link #rebuild(String)}, but only for specified keyspace and ranges. + * + * @param sourceDc Name of DC from which to select sources for streaming or null to pick any node + * @param keyspace Name of the keyspace which to rebuild or null to rebuild all keyspaces. + * @param tokens Range of tokens to rebuild or null to rebuild all token ranges. In the format of: + * "(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]" + */ + public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources); + /** Starts a bulk load and blocks until it completes. */ public void bulkLoad(String directory);