Skip to content

Commit

Permalink
Merge branch 'cassandra-3.0' into cassandra-3.11
Browse files Browse the repository at this point in the history
  • Loading branch information
ifesdjeen committed Feb 5, 2020
2 parents b33a11c + 3320f08 commit c09e709
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 60 deletions.
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/service/GCInspector.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;

import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.StatusLogger;

public class GCInspector implements NotificationListener, GCInspectorMXBean
Expand Down Expand Up @@ -147,7 +148,7 @@ public GCInspector()
gcStates.put(gc.getName(), new GCState(gc, assumeGCIsPartiallyConcurrent(gc), assumeGCIsOldGen(gc)));
}

mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
MBeanWrapper.instance.registerMBean(this, new ObjectName(MBEAN_NAME));
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ default public String localDatacenter()
*/
void propagate(Object writeToConfig);

/**
* Validates whether the config properties are within range of accepted values.
*/
void validate();
Object get(String fieldName);
String getString(String fieldName);
int getInt(String fieldName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.SimpleCondition;

/**
Expand Down Expand Up @@ -101,6 +100,8 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
private final List<I> instances;
private final Map<InetAddressAndPort, I> instanceMap;

private final Versions.Version initialVersion;

// mutated by user-facing API
private final MessageFilters filters;

Expand Down Expand Up @@ -131,7 +132,7 @@ public Wrapper(int generation, Versions.Version version, InstanceConfig config)
private IInvokableInstance newInstance(int generation)
{
ClassLoader classLoader = new InstanceClassLoader(generation, config.num, version.classpath, sharedClassLoader);
return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader)
return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>) Instance::new, classLoader)
.apply(config, classLoader);
}

Expand Down Expand Up @@ -209,18 +210,19 @@ public synchronized void setVersion(Versions.Version version)
}
}

protected AbstractCluster(File root, Versions.Version version, List<InstanceConfig> configs,
protected AbstractCluster(File root, Versions.Version initialVersion, List<InstanceConfig> configs,
ClassLoader sharedClassLoader)
{
this.root = root;
this.sharedClassLoader = sharedClassLoader;
this.instances = new ArrayList<>();
this.instanceMap = new HashMap<>();
this.initialVersion = initialVersion;
int generation = AbstractCluster.generation.incrementAndGet();

for (InstanceConfig config : configs)
{
I instance = newInstanceWrapper(generation, version, config);
I instance = newInstanceWrapperInternal(generation, initialVersion, config);
instances.add(instance);
// we use the config().broadcastAddressAndPort() here because we have not initialised the Instance
I prev = instanceMap.put(instance.broadcastAddressAndPort(), instance);
Expand All @@ -232,25 +234,62 @@ protected AbstractCluster(File root, Versions.Version version, List<InstanceConf

protected abstract I newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config);

protected I newInstanceWrapperInternal(int generation, Versions.Version version, InstanceConfig config)
{
config.validate();
return newInstanceWrapper(generation, version, config);
}

public I bootstrap(InstanceConfig config)
{
if (!config.has(Feature.GOSSIP) || !config.has(Feature.NETWORK))
throw new IllegalStateException("New nodes can only be bootstrapped when gossip and networking is enabled.");

I instance = newInstanceWrapperInternal(0, initialVersion, config);

instances.add(instance);
I prev = instanceMap.put(config.broadcastAddressAndPort(), instance);

if (null != prev)
{
throw new IllegalStateException(String.format("This cluster already contains a node (%d) with with same address and port: %s",
config.num,
instance));
}

return instance;
}

/**
* WARNING: we index from 1 here, for consistency with inet address!
*/
public ICoordinator coordinator(int node)
{
return instances.get(node - 1).coordinator();
}

/**
* WARNING: we index from 1 here, for consistency with inet address!
*/
public I get(int node) { return instances.get(node - 1); }
public I get(InetAddressAndPort addr) { return instanceMap.get(addr); }
public I get(int node)
{
return instances.get(node - 1);
}

public I get(InetAddressAndPort addr)
{
return instanceMap.get(addr);
}

public int size()
{
return instances.size();
}

public Stream<I> stream() { return instances.stream(); }
public Stream<I> stream()
{
return instances.stream();
}

public Stream<I> stream(String dcName)
{
Expand All @@ -263,13 +302,24 @@ public Stream<I> stream(String dcName, String rackName)
i.config().localRack().equals(rackName));
}

public void forEach(IIsolatedExecutor.SerializableRunnable runnable) { forEach(i -> i.sync(runnable)); }
public void forEach(Consumer<? super I> consumer) { forEach(instances, consumer); }
public void forEach(List<I> instancesForOp, Consumer<? super I> consumer) { instancesForOp.forEach(consumer); }
public void forEach(IIsolatedExecutor.SerializableRunnable runnable)
{
forEach(i -> i.sync(runnable));
}

public void forEach(Consumer<? super I> consumer)
{
forEach(instances, consumer);
}

public void forEach(List<I> instancesForOp, Consumer<? super I> consumer)
{
instancesForOp.forEach(consumer);
}

public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
{
parallelForEach(instances, consumer, timeout, unit);
parallelForEach(instances, consumer, timeout, unit);
}

public void parallelForEach(List<I> instances, IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
Expand Down Expand Up @@ -315,12 +365,12 @@ public void schemaChange(String query)

private void updateMessagingVersions()
{
for (IInstance reportTo: instances)
for (IInstance reportTo : instances)
{
if (reportTo.isShutdown())
continue;

for (IInstance reportFrom: instances)
for (IInstance reportFrom : instances)
{
if (reportFrom == reportTo || reportFrom.isShutdown())
continue;
Expand Down Expand Up @@ -390,15 +440,14 @@ private void startPolling()
}



/**
* Will wait for a schema change AND agreement that occurs after it is created
* (and precedes the invocation to waitForAgreement)
*
* <p>
* Works by simply checking if all UUIDs agree after any schema version change event,
* so long as the waitForAgreement method has been entered (indicating the change has
* taken place on the coordinator)
*
* <p>
* This could perhaps be made a little more robust, but this should more than suffice.
*/
public class SchemaChangeMonitor extends ChangeMonitor
Expand Down Expand Up @@ -461,9 +510,11 @@ public void startup()
// and then start any instances with it disabled in parallel.
List<I> startSequentially = new ArrayList<>();
List<I> startParallel = new ArrayList<>();
for (I instance : instances)
for (int i = 0; i < instances.size(); i++)
{
if ((boolean) instance.config().get("auto_bootstrap"))
I instance = instances.get(i);

if (i == 0 || (boolean) instance.config().get("auto_bootstrap"))
startSequentially.add(instance);
else
startParallel.add(instance);
Expand All @@ -485,23 +536,31 @@ public static class Builder<I extends IInstance, C extends AbstractCluster<I>>
private final Factory<I, C> factory;
private int nodeCount;
private int subnet;
private Map<Integer, Pair<String,String>> nodeIdTopology;
private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
private TokenSupplier tokenSupplier;
private File root;
private Versions.Version version;
private Versions.Version version = Versions.CURRENT;
private Consumer<InstanceConfig> configUpdater;

public Builder(Factory<I, C> factory)
{
this.factory = factory;
}

public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier)
{
this.tokenSupplier = tokenSupplier;
return this;
}

public Builder<I, C> withSubnet(int subnet)
{
this.subnet = subnet;
return this;
}

public Builder<I, C> withNodes(int nodeCount) {
public Builder<I, C> withNodes(int nodeCount)
{
this.nodeCount = nodeCount;
return this;
}
Expand Down Expand Up @@ -533,7 +592,7 @@ public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack)
for (int rack = 1; rack <= racksPerDC; rack++)
{
for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
nodeIdTopology.put(nodeId++, Pair.create(dcName(dc), rackName(rack)));
nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
}
}
// adjust the node count to match the allocatation
Expand Down Expand Up @@ -563,14 +622,14 @@ public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack)
nodeIdTopology = new HashMap<>();
}
for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++)
nodeIdTopology.put(nodeId, Pair.create(dcName, rackName));
nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName));

nodeCount += nodesInRack;
return this;
}

// Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
public Builder<I, C> withNodeIdTopology(Map<Integer,Pair<String,String>> nodeIdTopology)
public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
{
if (nodeIdTopology.isEmpty())
throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
Expand All @@ -584,7 +643,6 @@ public Builder<I, C> withNodeIdTopology(Map<Integer,Pair<String,String>> nodeIdT
{
nodeCount = nodeIdTopology.size();
logger.info("Adjusting node count to {} for supplied network topology", nodeCount);

}

this.nodeIdTopology = new HashMap<>(nodeIdTopology);
Expand Down Expand Up @@ -612,50 +670,59 @@ public Builder<I, C> withConfig(Consumer<InstanceConfig> updater)

public C createWithoutStarting() throws IOException
{
File root = this.root;
Versions.Version version = this.version;

if (root == null)
root = Files.createTempDirectory("dtests").toFile();

if (version == null)
version = Versions.CURRENT;

if (nodeCount <= 0)
throw new IllegalStateException("Cluster must have at least one node");

if (nodeIdTopology == null)
{
nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
.collect(Collectors.toMap(nodeId -> nodeId,
nodeId -> Pair.create(dcName(0), rackName(0))));
nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
}

root.mkdirs();
setupLogging(root);

ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();

List<InstanceConfig> configs = new ArrayList<>();
long token = Long.MIN_VALUE + 1, increment = 2 * (Long.MAX_VALUE / nodeCount);

String ipPrefix = "127.0." + subnet + ".";
String seedIp = ipPrefix + "1";

NetworkTopology networkTopology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology);
if (tokenSupplier == null)
tokenSupplier = evenlyDistributedTokens(nodeCount);

for (int i = 0 ; i < nodeCount ; ++i)
for (int i = 0; i < nodeCount; ++i)
{
int nodeNum = i + 1;
String ipAddress = ipPrefix + nodeNum;
InstanceConfig config = InstanceConfig.generate(i + 1, ipAddress, networkTopology, root, String.valueOf(token), seedIp);
if (configUpdater != null)
configUpdater.accept(config);
configs.add(config);
token += increment;
configs.add(createInstanceConfig(nodeNum));
}

return factory.newCluster(root, version, configs, sharedClassLoader);
}

public InstanceConfig newInstanceConfig(C cluster)
{
return createInstanceConfig(cluster.size() + 1);
}

private InstanceConfig createInstanceConfig(int nodeNum)
{
String ipPrefix = "127.0." + subnet + ".";
String seedIp = ipPrefix + "1";
String ipAddress = ipPrefix + nodeNum;
long token = tokenSupplier.token(nodeNum);

NetworkTopology topology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology);

InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
if (configUpdater != null)
configUpdater.accept(config);

return config;
}

public C start() throws IOException
{
C cluster = createWithoutStarting();
Expand All @@ -664,6 +731,21 @@ public C start() throws IOException
}
}

public static TokenSupplier evenlyDistributedTokens(int numNodes)
{
long increment = (Long.MAX_VALUE / numNodes) * 2;
return (int nodeId) -> {
assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy",
nodeId, numNodes);
return Long.MIN_VALUE + 1 + nodeId * increment;
};
}

public static interface TokenSupplier
{
public long token(int nodeId);
}

static String dcName(int index)
{
return "datacenter" + index;
Expand Down
Loading

0 comments on commit c09e709

Please sign in to comment.