Skip to content

Commit

Permalink
Rebuild from targeted replica (CASSANDRA-9875)
Browse files Browse the repository at this point in the history
  • Loading branch information
Cameron Zemek committed Oct 22, 2020
1 parent 4ee4cee commit 30ca748
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 3 deletions.
18 changes: 18 additions & 0 deletions src/java/org/apache/cassandra/dht/RangeStreamer.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,24 @@ public boolean shouldInclude(InetAddress endpoint)
}
}

/**
* Source filter which only includes endpoints contained within a provided set.
*/
public static class WhitelistedSourcesFilter implements ISourceFilter
{
private final Set<InetAddress> whitelistedSources;

public WhitelistedSourcesFilter(Set<InetAddress> whitelistedSources)
{
this.whitelistedSources = whitelistedSources;
}

public boolean shouldInclude(InetAddress endpoint)
{
return whitelistedSources.contains(endpoint);
}
}

public RangeStreamer(TokenMetadata metadata,
Collection<Token> tokens,
InetAddress address,
Expand Down
94 changes: 91 additions & 3 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@
import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport;

import java.util.Scanner;
import java.util.regex.MatchResult;
import java.util.regex.Pattern;

import static java.util.concurrent.TimeUnit.MINUTES;

/**
Expand Down Expand Up @@ -1111,12 +1115,26 @@ public boolean isJoined()
public void rebuild(String sourceDc)
{
// check on going rebuild
rebuild(sourceDc, null, null, null);
}

public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources)
{
// check ongoing rebuild
if (!isRebuilding.compareAndSet(false, true))
{
throw new IllegalStateException("Node is still rebuilding. Check nodetool netstats.");
}

logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
// check the arguments
if (keyspace == null && tokens != null)
{
throw new IllegalArgumentException("Cannot specify tokens without keyspace.");
}

logger.info("rebuild from dc: {}, {}, {}", sourceDc == null ? "(any dc)" : sourceDc,
keyspace == null ? "(All keyspaces)" : keyspace,
tokens == null ? "(All tokens)" : tokens);

try
{
Expand All @@ -1131,8 +1149,78 @@ public void rebuild(String sourceDc)
if (sourceDc != null)
streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));

for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName));
if (keyspace == null)
{
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName));
}
else if (tokens == null)
{
streamer.addRanges(keyspace, getLocalRanges(keyspace));
}
else
{
Token.TokenFactory factory = getPartitioner().getTokenFactory();
List<Range<Token>> ranges = new ArrayList<>();
Pattern rangePattern = Pattern.compile("\\(\\s*(-?\\w+)\\s*,\\s*(-?\\w+)\\s*\\]");
try (Scanner tokenScanner = new Scanner(tokens))
{
while (tokenScanner.findInLine(rangePattern) != null)
{
MatchResult range = tokenScanner.match();
Token startToken = factory.fromString(range.group(1));
Token endToken = factory.fromString(range.group(2));
logger.info("adding range: ({},{}]", startToken, endToken);
ranges.add(new Range<>(startToken, endToken));
}
if (tokenScanner.hasNext())
throw new IllegalArgumentException("Unexpected string: " + tokenScanner.next());
}

// Ensure all specified ranges are actually ranges owned by this host
Collection<Range<Token>> localRanges = getLocalRanges(keyspace);
for (Range<Token> specifiedRange : ranges)
{
boolean foundParentRange = false;
for (Range<Token> localRange : localRanges)
{
if (localRange.contains(specifiedRange))
{
foundParentRange = true;
break;
}
}
if (!foundParentRange)
{
throw new IllegalArgumentException(String.format("The specified range %s is not a range that is owned by this node. Please ensure that all token ranges specified to be rebuilt belong to this node.", specifiedRange.toString()));
}
}

if (specificSources != null)
{
String[] stringHosts = specificSources.split(",");
Set<InetAddress> sources = new HashSet<>(stringHosts.length);
for (String stringHost : stringHosts)
{
try
{
InetAddress endpoint = InetAddress.getByName(stringHost);
if (FBUtilities.getBroadcastAddress().equals(endpoint))
{
throw new IllegalArgumentException("This host was specified as a source for rebuilding. Sources for a rebuild can only be other nodes in the cluster.");
}
sources.add(endpoint);
}
catch (UnknownHostException ex)
{
throw new IllegalArgumentException("Unknown host specified " + stringHost, ex);
}
}
streamer.addSourceFilter(new RangeStreamer.WhitelistedSourcesFilter(sources));
}

streamer.addRanges(keyspace, ranges);
}

StreamResultFuture resultFuture = streamer.fetchAsync();
// wait for result
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/service/StorageServiceMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,16 @@ public interface StorageServiceMBean extends NotificationEmitter
*/
public void rebuild(String sourceDc);

/**
* Same as {@link #rebuild(String)}, but only for specified keyspace and ranges.
*
* @param sourceDc Name of DC from which to select sources for streaming or null to pick any node
* @param keyspace Name of the keyspace which to rebuild or null to rebuild all keyspaces.
* @param tokens Range of tokens to rebuild or null to rebuild all token ranges. In the format of:
* "(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]"
*/
public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources);

/** Starts a bulk load and blocks until it completes. */
public void bulkLoad(String directory);

Expand Down

0 comments on commit 30ca748

Please sign in to comment.