Skip to content

Commit

Permalink
In-JVM DTest: Add network topology and tracing support
Browse files Browse the repository at this point in the history
In-JVM DTest: readRepairTest - Set read repair query to CL.ALL

The current test relies on the order of nodes returned by the snitch
to include node3, which SimpleSnitch does.  With support for other
snitches coming then the test should be able to handle any order
of nodes - so make sure all nodes are present.

In-JVM DTest: remove minimum messaging service calculation

Match change on trunk to resolves issue with trying to call
getMessagingVersion on nodes that are not started.  Fixes
mixed version upgrades once all branches are updated.

Patch by Jon Meredith; Reviewed by Dinesh Joshi and Alex Petrov for CASSANDRA-15319
  • Loading branch information
Jon Meredith authored and dineshjoshi committed Oct 8, 2019
1 parent 8dcaa12 commit 58a5ce1
Show file tree
Hide file tree
Showing 19 changed files with 738 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* Make tools/bin/token-generator py2/3 compatible (CASSANDRA-15012)
* Multi-version in-JVM dtests (CASSANDRA-14937)
* Allow instance class loaders to be garbage collected for inJVM dtest (CASSANDRA-15170)
* Add support for network topology and query tracing for inJVM dtest (CASSANDRA-15319)


2.2.14
Expand Down
8 changes: 7 additions & 1 deletion src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ private void joinTokenRing(int delay) throws ConfigurationException
}

// if we don't have system_traces keyspace at this point, then create it manually
maybeAddOrUpdateKeyspace(TraceKeyspace.definition());
ensureTraceKeyspace();
maybeAddOrUpdateKeyspace(SystemDistributedKeyspace.definition());

if (!isSurveyMode)
Expand Down Expand Up @@ -978,6 +978,12 @@ private void joinTokenRing(int delay) throws ConfigurationException
}
}

@VisibleForTesting
public void ensureTraceKeyspace()
{
maybeAddOrUpdateKeyspace(TraceKeyspace.definition());
}

public static boolean isReplacingSameAddress()
{
return DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,14 @@ protected IInvokableInstance newInstanceWrapper(int generation, Versions.Version
return new Wrapper(generation, version, config);
}

public static Builder<IInvokableInstance, Cluster> build()
{
return new Builder<>(Cluster::new);
}

public static Builder<IInvokableInstance, Cluster> build(int nodeCount)
{
return new Builder<>(nodeCount, Cluster::new);
return build().withNodes(nodeCount);
}

public static Cluster create(int nodeCount, Consumer<InstanceConfig> configUpdater) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.cassandra.distributed;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;

import org.apache.cassandra.distributed.api.ICluster;
Expand Down Expand Up @@ -48,9 +46,14 @@ protected IUpgradeableInstance newInstanceWrapper(int generation, Versions.Versi
return new Wrapper(generation, version, config);
}

public static Builder<IUpgradeableInstance, UpgradeableCluster> build()
{
return new Builder<>(UpgradeableCluster::new);
}

public static Builder<IUpgradeableInstance, UpgradeableCluster> build(int nodeCount)
{
return new Builder<>(nodeCount, UpgradeableCluster::new);
return build().withNodes(nodeCount);
}

public static UpgradeableCluster create(int nodeCount) throws Throwable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public interface ICluster
IInstance get(InetAddressAndPort endpoint);
int size();
Stream<? extends IInstance> stream();
Stream<? extends IInstance> stream(String dcName);
Stream<? extends IInstance> stream(String dcName, String rackName);
IMessageFilters filters();

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.cassandra.distributed.api;

import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.Future;

// The cross-version API requires that a Coordinator can be constructed without any constructor arguments
public interface ICoordinator
Expand All @@ -28,4 +30,7 @@ public interface ICoordinator
Object[][] execute(String query, Enum<?> consistencyLevel, Object... boundValues);

Iterator<Object[]> executeWithPaging(String query, Enum<?> consistencyLevel, int pageSize, Object... boundValues);

Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, Enum<?> consistencyLevel, Object... boundValues);
Object[][] executeWithTracing(UUID sessionId, String query, Enum<?> consistencyLevel, Object... boundValues);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,27 @@

package org.apache.cassandra.distributed.api;

import org.apache.cassandra.locator.InetAddressAndPort;

import java.util.UUID;

import org.apache.cassandra.distributed.impl.NetworkTopology;
import org.apache.cassandra.locator.InetAddressAndPort;

public interface IInstanceConfig
{
int num();
UUID hostId();
InetAddressAndPort broadcastAddressAndPort();
NetworkTopology networkTopology();

default public String localRack()
{
return networkTopology().localRack(broadcastAddressAndPort());
}

default public String localDatacenter()
{
return networkTopology().localDC(broadcastAddressAndPort());
}

/**
* write the specified parameters to the Config object; we do not specify Config as the type to support a Config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import com.google.common.collect.Sets;
Expand All @@ -57,6 +58,7 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.SimpleCondition;

/**
Expand Down Expand Up @@ -193,13 +195,15 @@ public synchronized void setVersion(Versions.Version version)
}
}

protected AbstractCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader)
protected AbstractCluster(File root, Versions.Version version, List<InstanceConfig> configs,
ClassLoader sharedClassLoader)
{
this.root = root;
this.sharedClassLoader = sharedClassLoader;
this.instances = new ArrayList<>();
this.instanceMap = new HashMap<>();
int generation = AbstractCluster.generation.incrementAndGet();

for (InstanceConfig config : configs)
{
I instance = newInstanceWrapper(generation, version, config);
Expand Down Expand Up @@ -231,7 +235,20 @@ public int size()
{
return instances.size();
}

public Stream<I> stream() { return instances.stream(); }

public Stream<I> stream(String dcName)
{
return instances.stream().filter(i -> i.config().localDatacenter().equals(dcName));
}

public Stream<I> stream(String dcName, String rackName)
{
return instances.stream().filter(i -> i.config().localDatacenter().equals(dcName) &&
i.config().localRack().equals(rackName));
}

public void forEach(IIsolatedExecutor.SerializableRunnable runnable) { forEach(i -> i.sync(runnable)); }
public void forEach(Consumer<? super I> consumer) { instances.forEach(consumer); }
public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit units)
Expand Down Expand Up @@ -353,15 +370,16 @@ protected interface Factory<I extends IInstance, C extends AbstractCluster<I>>

public static class Builder<I extends IInstance, C extends AbstractCluster<I>>
{
private final int nodeCount;
private final Factory<I, C> factory;
private int nodeCount;
private int subnet;
private Map<Integer, Pair<String,String>> nodeIdTopology;
private File root;
private Versions.Version version;
private Consumer<InstanceConfig> configUpdater;
public Builder(int nodeCount, Factory<I, C> factory)

public Builder(Factory<I, C> factory)
{
this.nodeCount = nodeCount;
this.factory = factory;
}

Expand All @@ -371,6 +389,97 @@ public Builder<I, C> withSubnet(int subnet)
return this;
}

public Builder<I, C> withNodes(int nodeCount) {
this.nodeCount = nodeCount;
return this;
}

public Builder<I, C> withDCs(int dcCount)
{
return withRacks(dcCount, 1);
}

public Builder<I, C> withRacks(int dcCount, int racksPerDC)
{
if (nodeCount == 0)
throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder");

int totalRacks = dcCount * racksPerDC;
int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer
return withRacks(dcCount, racksPerDC, nodesPerRack);
}

public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack)
{
if (nodeIdTopology != null)
throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls");

nodeIdTopology = new HashMap<>();
int nodeId = 1;
for (int dc = 1; dc <= dcCount; dc++)
{
for (int rack = 1; rack <= racksPerDC; rack++)
{
for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
nodeIdTopology.put(nodeId++, Pair.create(dcName(dc), rackName(rack)));
}
}
// adjust the node count to match the allocatation
final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack;
if (adjustedNodeCount != nodeCount)
{
assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count";
logger.info("Network topology of {} DCs with {} racks per DC and {} nodes per rack required increasing total nodes to {}",
dcCount, racksPerDC, nodesPerRack, adjustedNodeCount);
nodeCount = adjustedNodeCount;
}
return this;
}

public Builder<I, C> withDC(String dcName, int nodeCount)
{
return withRack(dcName, rackName(1), nodeCount);
}

public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack)
{
if (nodeIdTopology == null)
{
if (nodeCount > 0)
throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks");

nodeIdTopology = new HashMap<>();
}
for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++)
nodeIdTopology.put(nodeId, Pair.create(dcName, rackName));

nodeCount += nodesInRack;
return this;
}

// Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
public Builder<I, C> withNodeIdTopology(Map<Integer,Pair<String,String>> nodeIdTopology)
{
if (nodeIdTopology.isEmpty())
throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");

IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> {
if (nodeIdTopology.get(nodeId) == null)
throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId);
});

if (nodeCount != nodeIdTopology.size())
{
nodeCount = nodeIdTopology.size();
logger.info("Adjusting node count to {} for supplied network topology", nodeCount);

}

this.nodeIdTopology = new HashMap<>(nodeIdTopology);

return this;
}

public Builder<I, C> withRoot(File root)
{
this.root = root;
Expand All @@ -396,27 +505,42 @@ public C createWithoutStarting() throws IOException

if (root == null)
root = Files.createTempDirectory("dtests").toFile();

if (version == null)
version = Versions.CURRENT;

if (nodeCount <= 0)
throw new IllegalStateException("Cluster must have at least one node");

if (nodeIdTopology == null)
nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
.collect(Collectors.toMap(nodeId -> nodeId,
nodeId -> Pair.create(dcName(0), rackName(0))));

root.mkdirs();
setupLogging(root);

ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();

List<InstanceConfig> configs = new ArrayList<>();
long token = Long.MIN_VALUE + 1, increment = 2 * (Long.MAX_VALUE / nodeCount);
for (int i = 0; i < nodeCount; ++i)

String ipPrefix = "127.0." + subnet + ".";

NetworkTopology networkTopology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology);

for (int i = 0 ; i < nodeCount ; ++i)
{
InstanceConfig config = InstanceConfig.generate(i + 1, subnet, root, String.valueOf(token));
int nodeNum = i + 1;
String ipAddress = ipPrefix + nodeNum;
InstanceConfig config = InstanceConfig.generate(i + 1, ipAddress, networkTopology, root, String.valueOf(token));
if (configUpdater != null)
configUpdater.accept(config);
configs.add(config);
token += increment;
}

C cluster = factory.newCluster(root, version, configs, sharedClassLoader);
return cluster;
return factory.newCluster(root, version, configs, sharedClassLoader);
}

public C start() throws IOException
Expand All @@ -427,18 +551,29 @@ public C start() throws IOException
}
}

static String dcName(int index)
{
return "datacenter" + index;
}

static String rackName(int index)
{
return "rack" + index;
}

private static void setupLogging(File root)
{
try
{
String testConfPath = "test/conf/logback-dtest.xml";
Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");

if (!logConfPath.toFile().exists())
{
Files.copy(new File(testConfPath).toPath(),
logConfPath);
}

System.setProperty("logback.configurationFile", "file://" + logConfPath);
}
catch (IOException e)
Expand Down
Loading

0 comments on commit 58a5ce1

Please sign in to comment.