Skip to content

Commit

Permalink
Rebuild from targeted replica (CASSANDRA-9875)
Browse files Browse the repository at this point in the history
  • Loading branch information
Cameron Zemek committed Oct 22, 2020
1 parent 94e9149 commit b712e0d
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 4 deletions.
18 changes: 18 additions & 0 deletions src/java/org/apache/cassandra/dht/RangeStreamer.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@ public boolean shouldInclude(InetAddress endpoint)
}
}

/**
* Source filter which only includes endpoints contained within a provided set.
*/
public static class WhitelistedSourcesFilter implements ISourceFilter
{
private final Set<InetAddress> whitelistedSources;

public WhitelistedSourcesFilter(Set<InetAddress> whitelistedSources)
{
this.whitelistedSources = whitelistedSources;
}

public boolean shouldInclude(InetAddress endpoint)
{
return whitelistedSources.contains(endpoint);
}
}

public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddress address, String description)
{
this.metadata = metadata;
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.management.ObjectName;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.FBUtilities;

public class EndpointSnitchInfo implements EndpointSnitchInfoMBean
{
Expand All @@ -46,6 +47,11 @@ public String getDatacenter(String host) throws UnknownHostException
return DatabaseDescriptor.getEndpointSnitch().getDatacenter(InetAddress.getByName(host));
}

public String getDatacenter()
{
return DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
}

public String getRack(String host) throws UnknownHostException
{
return DatabaseDescriptor.getEndpointSnitch().getRack(InetAddress.getByName(host));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public interface EndpointSnitchInfoMBean
*/
public String getDatacenter(String host) throws UnknownHostException;

/**
* Provides the Datacenter name depending on the respective snitch used for this node
*/
public String getDatacenter();

/**
* Provides the snitch name of the cluster
Expand Down
94 changes: 90 additions & 4 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Scanner;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
Expand All @@ -52,6 +53,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.MatchResult;
import java.util.regex.Pattern;

import javax.management.JMX;
import javax.management.MBeanServer;
Expand Down Expand Up @@ -1043,13 +1046,26 @@ public boolean isJoined()

public void rebuild(String sourceDc)
{
// check on going rebuild
rebuild(sourceDc, null, null, null);
}

public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources)
{
// check ongoing rebuild
if (!isRebuilding.compareAndSet(false, true))
{
throw new IllegalStateException("Node is still rebuilding. Check nodetool netstats.");
}

logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
// check the arguments
if (keyspace == null && tokens != null)
{
throw new IllegalArgumentException("Cannot specify tokens without keyspace.");
}

logger.info("rebuild from dc: {}, {}, {}", sourceDc == null ? "(any dc)" : sourceDc,
keyspace == null ? "(All keyspaces)" : keyspace,
tokens == null ? "(All tokens)" : tokens);

try
{
Expand All @@ -1058,8 +1074,78 @@ public void rebuild(String sourceDc)
if (sourceDc != null)
streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));

for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName));
if (keyspace == null)
{
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName));
}
else if (tokens == null)
{
streamer.addRanges(keyspace, getLocalRanges(keyspace));
}
else
{
Token.TokenFactory factory = getPartitioner().getTokenFactory();
List<Range<Token>> ranges = new ArrayList<>();
Pattern rangePattern = Pattern.compile("\\(\\s*(-?\\w+)\\s*,\\s*(-?\\w+)\\s*\\]");
try (Scanner tokenScanner = new Scanner(tokens))
{
while (tokenScanner.findInLine(rangePattern) != null)
{
MatchResult range = tokenScanner.match();
Token startToken = factory.fromString(range.group(1));
Token endToken = factory.fromString(range.group(2));
logger.info("adding range: ({},{}]", startToken, endToken);
ranges.add(new Range<>(startToken, endToken));
}
if (tokenScanner.hasNext())
throw new IllegalArgumentException("Unexpected string: " + tokenScanner.next());
}

// Ensure all specified ranges are actually ranges owned by this host
Collection<Range<Token>> localRanges = getLocalRanges(keyspace);
for (Range<Token> specifiedRange : ranges)
{
boolean foundParentRange = false;
for (Range<Token> localRange : localRanges)
{
if (localRange.contains(specifiedRange))
{
foundParentRange = true;
break;
}
}
if (!foundParentRange)
{
throw new IllegalArgumentException(String.format("The specified range %s is not a range that is owned by this node. Please ensure that all token ranges specified to be rebuilt belong to this node.", specifiedRange.toString()));
}
}

if (specificSources != null)
{
String[] stringHosts = specificSources.split(",");
Set<InetAddress> sources = new HashSet<>(stringHosts.length);
for (String stringHost : stringHosts)
{
try
{
InetAddress endpoint = InetAddress.getByName(stringHost);
if (FBUtilities.getBroadcastAddress().equals(endpoint))
{
throw new IllegalArgumentException("This host was specified as a source for rebuilding. Sources for a rebuild can only be other nodes in the cluster.");
}
sources.add(endpoint);
}
catch (UnknownHostException ex)
{
throw new IllegalArgumentException("Unknown host specified " + stringHost, ex);
}
}
streamer.addSourceFilter(new RangeStreamer.WhitelistedSourcesFilter(sources));
}

streamer.addRanges(keyspace, ranges);
}

StreamResultFuture resultFuture = streamer.fetchAsync();
// wait for result
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/service/StorageServiceMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,16 @@ public interface StorageServiceMBean extends NotificationEmitter
*/
public void rebuild(String sourceDc);

/**
* Same as {@link #rebuild(String)}, but only for specified keyspace and ranges.
*
* @param sourceDc Name of DC from which to select sources for streaming or null to pick any node
* @param keyspace Name of the keyspace which to rebuild or null to rebuild all keyspaces.
* @param tokens Range of tokens to rebuild or null to rebuild all token ranges. In the format of:
* "(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]"
*/
public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources);

/** Starts a bulk load and blocks until it completes. */
public void bulkLoad(String directory);

Expand Down

0 comments on commit b712e0d

Please sign in to comment.